revisited declaration of attributes for node and application
[nepi.git] / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.trace import Trace, TraceAttr
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import tnow, tdiffsec
26
27 import os
28 import subprocess
29
30 # TODO: Resolve wildcards in commands!!
31 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
32
33 @clsinit_copy
34 class LinuxApplication(ResourceManager):
35     """
36     .. class:: Class Args :
37       
38         :param ec: The Experiment controller
39         :type ec: ExperimentController
40         :param guid: guid of the RM
41         :type guid: int
42
43     .. note::
44
45         A LinuxApplication RM represents a process that can be executed in
46         a remote Linux host using SSH.
47
48         The LinuxApplication RM takes care of uploadin sources and any files
49         needed to run the experiment, to the remote host. 
50         It also allows to provide source compilation (build) and installation 
51         instructions, and takes care of automating the sources build and 
52         installation tasks for the user.
53
54         It is important to note that files uploaded to the remote host have
55         two possible scopes: single-experiment or multi-experiment.
56         Single experiment files are those that will not be re-used by other 
57         experiments. Multi-experiment files are those that will.
58         Sources and shared files are always made available to all experiments.
59
60         Directory structure:
61
62         The directory structure used by LinuxApplication RM at the Linux
63         host is the following:
64
65         ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
66                       |
67         ${LIB}        |- /lib --> Base directory for libraries
68         ${BIN}        |- /bin --> Base directory for binary files
69         ${SRC}        |- /src --> Base directory for sources
70         ${SHARE}      |- /share --> Base directory for other files
71
72         ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
73                       |
74         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
75                           |
76         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
77                                |     specific files (e.g. command.sh, input)
78                                | 
79         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
80
81     """
82
83     _rtype = "linux::Application"
84     _help = "Runs an application on a Linux host with a BASH command "
85     _platform = "linux"
86
87     @classmethod
88     def _register_attributes(cls):
89         cls._register_attribute(
90             Attribute("command", "Command to execute at application start. "
91                       "Note that commands will be executed in the ${RUN_HOME} directory, "
92                       "make sure to take this into account when using relative paths. ", 
93                       flags = Flags.Design))
94         cls._register_attribute(
95             Attribute("forwardX11",
96                       "Enables X11 forwarding for SSH connections", 
97                       flags = Flags.Design))
98         cls._register_attribute(
99             Attribute("env",
100                       "Environment variables string for command execution",
101                       flags = Flags.Design))
102         cls._register_attribute(
103             Attribute("sudo",
104                       "Run with root privileges", 
105                       flags = Flags.Design))
106         cls._register_attribute(
107             Attribute("depends", 
108                       "Space-separated list of packages required to run the application",
109                       flags = Flags.Design))
110         cls._register_attribute(
111             Attribute("sources", 
112                       "semi-colon separated list of regular files to be uploaded to ${SRC} "
113                       "directory prior to building. Archives won't be expanded automatically. "
114                       "Sources are globally available for all experiments unless "
115                       "cleanHome is set to True (This will delete all sources). ",
116                       flags = Flags.Design))
117         cls._register_attribute(
118             Attribute("files", 
119                       "semi-colon separated list of regular miscellaneous files to be uploaded "
120                       "to ${SHARE} directory. "
121                       "Files are globally available for all experiments unless "
122                       "cleanHome is set to True (This will delete all files). ",
123                       flags = Flags.Design))
124         cls._register_attribute(
125             Attribute("libs", 
126                       "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
127                       "to ${LIB} directory. "
128                       "Libraries are globally available for all experiments unless "
129                       "cleanHome is set to True (This will delete all files). ",
130                       flags = Flags.Design))
131         cls._register_attribute(
132             Attribute("bins", 
133                       "semi-colon separated list of binary files to be uploaded "
134                       "to ${BIN} directory. "
135                       "Binaries are globally available for all experiments unless "
136                       "cleanHome is set to True (This will delete all files). ",
137                       flags = Flags.Design))
138         cls._register_attribute(
139             Attribute("code", 
140                       "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
141                       flags = Flags.Design))
142         cls._register_attribute(
143             Attribute("build", 
144                       "Build commands to execute after deploying the sources. "
145                       "Sources are uploaded to the ${SRC} directory and code "
146                       "is uploaded to the ${APP_HOME} directory. \n"
147                       "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
148                       "./configure && make && make clean.\n"
149                       "Make sure to make the build commands return with a nonzero exit "
150                       "code on error.",
151                       flags = Flags.Design))
152         cls._register_attribute(
153             Attribute("install", 
154                       "Commands to transfer built files to their final destinations. "
155                       "Install commands are executed after build commands. ",
156                       flags = Flags.Design))
157         cls._register_attribute(
158             Attribute("stdin", "Standard input for the 'command'", 
159                       flags = Flags.Design))
160         cls._register_attribute(
161             Attribute("tearDown", "Command to be executed just before " 
162                       "releasing the resource", 
163                       flags = Flags.Design))
164
165     @classmethod
166     def _register_traces(cls):
167         stdout = Trace("stdout", "Standard output stream", enabled = True)
168         stderr = Trace("stderr", "Standard error stream", enabled = True)
169
170         cls._register_trace(stdout)
171         cls._register_trace(stderr)
172
173     def __init__(self, ec, guid):
174         super(LinuxApplication, self).__init__(ec, guid)
175         self._pid = None
176         self._ppid = None
177         self._node = None
178         self._home = "app-%s" % self.guid
179
180         # whether the command should run in foreground attached
181         # to a terminal
182         self._in_foreground = False
183
184         # whether to use sudo to kill the application process
185         self._sudo_kill = False
186
187         # keep a reference to the running process handler when 
188         # the command is not executed as remote daemon in background
189         self._proc = None
190
191         # timestamp of last state check of the application
192         self._last_state_check = tnow()
193         
194     def log_message(self, msg):
195         return " guid %d - host %s - %s " % (self.guid, 
196                 self.node.get("hostname"), msg)
197
198     @property
199     def node(self):
200         if not self._node:
201             node = self.get_connected(LinuxNode.get_rtype())
202             if not node: 
203                 msg = "Application %s guid %d NOT connected to Node" % (
204                         self._rtype, self.guid)
205                 raise RuntimeError(msg)
206
207             self._node = node[0]
208
209         return self._node
210
211     @property
212     def app_home(self):
213         return os.path.join(self.node.exp_home, self._home)
214
215     @property
216     def run_home(self):
217         return os.path.join(self.app_home, self.ec.run_id)
218
219     @property
220     def pid(self):
221         return self._pid
222
223     @property
224     def ppid(self):
225         return self._ppid
226
227     @property
228     def in_foreground(self):
229         """ Returns True if the command needs to be executed in foreground.
230         This means that command will be executed using 'execute' instead of
231         'run' ('run' executes a command in background and detached from the 
232         terminal)
233         
234         When using X11 forwarding option, the command can not run in background
235         and detached from a terminal, since we need to keep the terminal attached 
236         to interact with it.
237         """
238         return self.get("forwardX11") or self._in_foreground
239
240     def trace_filepath(self, filename):
241         return os.path.join(self.run_home, filename)
242
243     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
244         self.info("Retrieving '%s' trace %s " % (name, attr))
245
246         path = self.trace_filepath(name)
247         
248         command = "(test -f %s && echo 'success') || echo 'error'" % path
249         (out, err), proc = self.node.execute(command)
250
251         if (err and proc.poll()) or out.find("error") != -1:
252             msg = " Couldn't find trace %s " % name
253             self.error(msg, out, err)
254             return None
255     
256         if attr == TraceAttr.PATH:
257             return path
258
259         if attr == TraceAttr.ALL:
260             (out, err), proc = self.node.check_output(self.run_home, name)
261             
262             if proc.poll():
263                 msg = " Couldn't read trace %s " % name
264                 self.error(msg, out, err)
265                 return None
266
267             return out
268
269         if attr == TraceAttr.STREAM:
270             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
271         elif attr == TraceAttr.SIZE:
272             cmd = "stat -c%%s %s " % path
273
274         (out, err), proc = self.node.execute(cmd)
275
276         if proc.poll():
277             msg = " Couldn't find trace %s " % name
278             self.error(msg, out, err)
279             return None
280         
281         if attr == TraceAttr.SIZE:
282             out = int(out.strip())
283
284         return out
285
286     def do_provision(self):
287         # take a snapshot of the system if user is root
288         # to ensure that cleanProcess will not kill
289         # pre-existent processes
290         if self.node.get("username") == 'root':
291             import pickle
292             procs = dict()
293             ps_aux = "ps aux |awk '{print $2,$11}'"
294             (out, err), proc = self.node.execute(ps_aux)
295             if len(out) != 0:
296                 for line in out.strip().split("\n"):
297                     parts = line.strip().split(" ")
298                     procs[parts[0]] = parts[1]
299                 with open("/tmp/save.proc", "wb") as pickle_file:
300                     pickle.dump(procs, pickle_file)
301             
302         # create run dir for application
303         self.node.mkdir(self.run_home)
304    
305         # List of all the provision methods to invoke
306         steps = [
307             # upload sources
308             self.upload_sources,
309             # upload files
310             self.upload_files,
311             # upload binaries
312             self.upload_binaries,
313             # upload libraries
314             self.upload_libraries,
315             # upload code
316             self.upload_code,
317             # upload stdin
318             self.upload_stdin,
319             # install dependencies
320             self.install_dependencies,
321             # build
322             self.build,
323             # Install
324             self.install]
325
326         command = []
327
328         # Since provisioning takes a long time, before
329         # each step we check that the EC is still 
330         for step in steps:
331             if self.ec.abort:
332                 self.debug("Interrupting provisioning. EC says 'ABORT")
333                 return
334             
335             ret = step()
336             if ret:
337                 command.append(ret)
338
339         # upload deploy script
340         deploy_command = ";".join(command)
341         self.execute_deploy_command(deploy_command)
342
343         # upload start script
344         self.upload_start_command()
345        
346         self.info("Provisioning finished")
347
348         super(LinuxApplication, self).do_provision()
349
350     def upload_start_command(self, overwrite = False):
351         # Upload command to remote bash script
352         # - only if command can be executed in background and detached
353         command = self.get("command")
354
355         if command and not self.in_foreground:
356             self.info("Uploading command '%s'" % command)
357
358             # replace application specific paths in the command
359             command = self.replace_paths(command)
360             # replace application specific paths in the environment
361             env = self.get("env")
362             env = env and self.replace_paths(env)
363
364             shfile = os.path.join(self.app_home, "start.sh")
365
366             self.node.upload_command(command, 
367                     shfile = shfile,
368                     env = env,
369                     overwrite = overwrite)
370
371     def execute_deploy_command(self, command, prefix="deploy"):
372         if command:
373             # replace application specific paths in the command
374             command = self.replace_paths(command)
375             
376             # replace application specific paths in the environment
377             env = self.get("env")
378             env = env and self.replace_paths(env)
379
380             # Upload the command to a bash script and run it
381             # in background ( but wait until the command has
382             # finished to continue )
383             shfile = os.path.join(self.app_home, "%s.sh" % prefix)
384             self.node.run_and_wait(command, self.run_home,
385                     shfile = shfile, 
386                     overwrite = False,
387                     pidfile = "%s_pidfile" % prefix, 
388                     ecodefile = "%s_exitcode" % prefix, 
389                     stdout = "%s_stdout" % prefix, 
390                     stderr = "%s_stderr" % prefix)
391
392     def upload_sources(self, sources = None, src_dir = None):
393         if not sources:
394             sources = self.get("sources")
395    
396         command = ""
397
398         if not src_dir:
399             src_dir = self.node.src_dir
400
401         if sources:
402             self.info("Uploading sources ")
403
404             sources = [str.strip(source) for source in sources.split(";")]
405
406             # Separate sources that should be downloaded from 
407             # the web, from sources that should be uploaded from
408             # the local machine
409             command = []
410             for source in list(sources):
411                 if source.startswith("http") or source.startswith("https"):
412                     # remove the hhtp source from the sources list
413                     sources.remove(source)
414
415                     command.append( " ( " 
416                             # Check if the source already exists
417                             " ls %(src_dir)s/%(basename)s "
418                             " || ( "
419                             # If source doesn't exist, download it and check
420                             # that it it downloaded ok
421                             "   wget -c --directory-prefix=%(src_dir)s %(source)s && "
422                             "   ls %(src_dir)s/%(basename)s "
423                             " ) ) " % {
424                                 "basename": os.path.basename(source),
425                                 "source": source,
426                                 "src_dir": src_dir
427                                 })
428
429             command = " && ".join(command)
430
431             # replace application specific paths in the command
432             command = self.replace_paths(command)
433        
434             if sources:
435                 sources = ';'.join(sources)
436                 self.node.upload(sources, src_dir, overwrite = False)
437
438         return command
439
440     def upload_files(self, files = None):
441         if not files:
442             files = self.get("files")
443
444         if files:
445             self.info("Uploading files %s " % files)
446             self.node.upload(files, self.node.share_dir, overwrite = False)
447
448     def upload_libraries(self, libs = None):
449         if not libs:
450             libs = self.get("libs")
451
452         if libs:
453             self.info("Uploading libraries %s " % libaries)
454             self.node.upload(libs, self.node.lib_dir, overwrite = False)
455
456     def upload_binaries(self, bins = None):
457         if not bins:
458             bins = self.get("bins")
459
460         if bins:
461             self.info("Uploading binaries %s " % binaries)
462             self.node.upload(bins, self.node.bin_dir, overwrite = False)
463
464     def upload_code(self, code = None):
465         if not code:
466             code = self.get("code")
467
468         if code:
469             self.info("Uploading code")
470
471             dst = os.path.join(self.app_home, "code")
472             self.node.upload(code, dst, overwrite = False, text = True)
473
474     def upload_stdin(self, stdin = None):
475         if not stdin:
476            stdin = self.get("stdin")
477
478         if stdin:
479             # create dir for sources
480             self.info("Uploading stdin")
481             
482             # upload stdin file to ${SHARE_DIR} directory
483             if os.path.isfile(stdin):
484                 basename = os.path.basename(stdin)
485                 dst = os.path.join(self.node.share_dir, basename)
486             else:
487                 dst = os.path.join(self.app_home, "stdin")
488
489             self.node.upload(stdin, dst, overwrite = False, text = True)
490
491             # create "stdin" symlink on ${APP_HOME} directory
492             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
493                 "app_home": self.app_home, 
494                 "stdin": dst })
495
496             return command
497
498     def install_dependencies(self, depends = None):
499         if not depends:
500             depends = self.get("depends")
501
502         if depends:
503             self.info("Installing dependencies %s" % depends)
504             return self.node.install_packages_command(depends)
505
506     def build(self, build = None):
507         if not build:
508             build = self.get("build")
509
510         if build:
511             self.info("Building sources ")
512             
513             # replace application specific paths in the command
514             return self.replace_paths(build)
515
516     def install(self, install = None):
517         if not install:
518             install = self.get("install")
519
520         if install:
521             self.info("Installing sources ")
522
523             # replace application specific paths in the command
524             return self.replace_paths(install)
525
526     def do_deploy(self):
527         # Wait until node is associated and deployed
528         node = self.node
529         if not node or node.state < ResourceState.READY:
530             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state)
531             self.ec.schedule(self.reschedule_delay, self.deploy)
532         else:
533             command = self.get("command") or ""
534             self.info("Deploying command '%s' " % command)
535             self.do_discover()
536             self.do_provision()
537
538             super(LinuxApplication, self).do_deploy()
539    
540     def do_start(self):
541         command = self.get("command")
542
543         self.info("Starting command '%s'" % command)
544
545         if not command:
546             # If no command was given (i.e. Application was used for dependency
547             # installation), then the application is directly marked as STOPPED
548             super(LinuxApplication, self).set_stopped()
549         else:
550             if self.in_foreground:
551                 self._run_in_foreground()
552             else:
553                 self._run_in_background()
554
555             super(LinuxApplication, self).do_start()
556
557     def _run_in_foreground(self):
558         command = self.get("command")
559         sudo = self.get("sudo") or False
560         x11 = self.get("forwardX11")
561         env = self.get("env")
562
563         # Command will be launched in foreground and attached to the
564         # terminal using the node 'execute' in non blocking mode.
565
566         # We save the reference to the process in self._proc 
567         # to be able to kill the process from the stop method.
568         # We also set blocking = False, since we don't want the
569         # thread to block until the execution finishes.
570         (out, err), self._proc = self.execute_command(command, 
571                 env = env,
572                 sudo = sudo,
573                 forward_x11 = x11,
574                 blocking = False)
575
576         if self._proc.poll():
577             self.error(msg, out, err)
578             raise RuntimeError(msg)
579
580     def _run_in_background(self):
581         command = self.get("command")
582         env = self.get("env")
583         sudo = self.get("sudo") or False
584
585         stdout = "stdout"
586         stderr = "stderr"
587         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
588                 else None
589
590         # Command will be run as a daemon in baground and detached from any
591         # terminal.
592         # The command to run was previously uploaded to a bash script
593         # during deployment, now we launch the remote script using 'run'
594         # method from the node.
595         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
596         (out, err), proc = self.node.run(cmd, self.run_home, 
597             stdin = stdin, 
598             stdout = stdout,
599             stderr = stderr,
600             sudo = sudo)
601
602         # check if execution errors occurred
603         msg = " Failed to start command '%s' " % command
604         
605         if proc.poll():
606             self.error(msg, out, err)
607             raise RuntimeError(msg)
608     
609         # Wait for pid file to be generated
610         pid, ppid = self.node.wait_pid(self.run_home)
611         if pid: self._pid = int(pid)
612         if ppid: self._ppid = int(ppid)
613
614         # If the process is not running, check for error information
615         # on the remote machine
616         if not self.pid or not self.ppid:
617             (out, err), proc = self.node.check_errors(self.run_home,
618                     stderr = stderr) 
619
620             # Out is what was written in the stderr file
621             if err:
622                 msg = " Failed to start command '%s' " % command
623                 self.error(msg, out, err)
624                 raise RuntimeError(msg)
625     
626     def do_stop(self):
627         """ Stops application execution
628         """
629         command = self.get('command') or ''
630
631         if self.state == ResourceState.STARTED:
632         
633             self.info("Stopping command '%s' " % command)
634         
635             # If the command is running in foreground (it was launched using
636             # the node 'execute' method), then we use the handler to the Popen
637             # process to kill it. Else we send a kill signal using the pid and ppid
638             # retrieved after running the command with the node 'run' method
639             if self._proc:
640                 self._proc.kill()
641             else:
642                 # Only try to kill the process if the pid and ppid
643                 # were retrieved
644                 if self.pid and self.ppid:
645                     (out, err), proc = self.node.kill(self.pid, self.ppid,
646                             sudo = self._sudo_kill)
647
648                     """
649                     # TODO: check if execution errors occurred
650                     if (proc and proc.poll()) or err:
651                         msg = " Failed to STOP command '%s' " % self.get("command")
652                         self.error(msg, out, err)
653                     """
654
655             super(LinuxApplication, self).do_stop()
656
657     def do_release(self):
658         self.info("Releasing resource")
659
660         self.do_stop()
661         
662         tear_down = self.get("tearDown")
663         if tear_down:
664             self.node.execute(tear_down)
665
666         hard_release = self.get("hardRelease")
667         if hard_release:
668             self.node.rmdir(self.app_home)
669
670         super(LinuxApplication, self).do_release()
671         
672     @property
673     def state(self):
674         """ Returns the state of the application
675         """
676         if self._state == ResourceState.STARTED:
677             if self.in_foreground:
678                 # Check if the process we used to execute the command
679                 # is still running ...
680                 retcode = self._proc.poll()
681
682                 # retcode == None -> running
683                 # retcode > 0 -> error
684                 # retcode == 0 -> finished
685                 if retcode:
686                     out = ""
687                     msg = " Failed to execute command '%s'" % self.get("command")
688                     err = self._proc.stderr.read()
689                     self.error(msg, out, err)
690                     self.do_fail()
691
692                 elif retcode == 0:
693                     self.set_stopped()
694             else:
695                 # We need to query the status of the command we launched in 
696                 # background. In order to avoid overwhelming the remote host and
697                 # the local processor with too many ssh queries, the state is only
698                 # requested every 'state_check_delay' seconds.
699                 state_check_delay = 0.5
700                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
701                     if self.pid and self.ppid:
702                         # Make sure the process is still running in background
703                         status = self.node.status(self.pid, self.ppid)
704
705                         if status == ProcStatus.FINISHED:
706                             # If the program finished, check if execution
707                             # errors occurred
708                             (out, err), proc = self.node.check_errors(
709                                     self.run_home)
710
711                             if err:
712                                 msg = "Failed to execute command '%s'" % \
713                                         self.get("command")
714                                 self.error(msg, out, err)
715                                 self.do_fail()
716                             else:
717                                 self.set_stopped()
718
719                     self._last_state_check = tnow()
720
721         return self._state
722
723     def execute_command(self, command, 
724             env=None,
725             sudo=False,
726             tty=False,
727             forward_x11=False,
728             blocking=False):
729
730         environ = ""
731         if env:
732             environ = self.node.format_environment(env, inline=True)
733         command = environ + command
734         command = self.replace_paths(command)
735
736         return self.node.execute(command,
737                 sudo=sudo,
738                 tty=tty,
739                 forward_x11=forward_x11,
740                 blocking=blocking)
741
742     def replace_paths(self, command, node=None, app_home=None, run_home=None):
743         """
744         Replace all special path tags with shell-escaped actual paths.
745         """
746         if not node:
747             node=self.node
748
749         if not app_home:
750             app_home=self.app_home
751
752         if not run_home:
753             run_home = self.run_home
754
755         return ( command
756             .replace("${USR}", node.usr_dir)
757             .replace("${LIB}", node.lib_dir)
758             .replace("${BIN}", node.bin_dir)
759             .replace("${SRC}", node.src_dir)
760             .replace("${SHARE}", node.share_dir)
761             .replace("${EXP}", node.exp_dir)
762             .replace("${EXP_HOME}", node.exp_home)
763             .replace("${APP_HOME}", app_home)
764             .replace("${RUN_HOME}", run_home)
765             .replace("${NODE_HOME}", node.node_home)
766             .replace("${HOME}", node.home_dir)
767             )
768
769     def valid_connection(self, guid):
770         # TODO: Validate!
771         return True
772