Adding CCN RMs for Linux backend
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import strfnow, strfdiff
26
27 import os
28
29 # TODO: Resolve wildcards in commands!!
30
31
32 @clsinit
33 class LinuxApplication(ResourceManager):
34     _rtype = "LinuxApplication"
35
36     @classmethod
37     def _register_attributes(cls):
38         command = Attribute("command", "Command to execute", 
39                 flags = Flags.ExecReadOnly)
40         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
41                 flags = Flags.ExecReadOnly)
42         env = Attribute("env", "Environment variables string for command execution",
43                 flags = Flags.ExecReadOnly)
44         sudo = Attribute("sudo", "Run with root privileges", 
45                 flags = Flags.ExecReadOnly)
46         depends = Attribute("depends", 
47                 "Space-separated list of packages required to run the application",
48                 flags = Flags.ExecReadOnly)
49         sources = Attribute("sources", 
50                 "Space-separated list of regular files to be deployed in the working "
51                 "path prior to building. Archives won't be expanded automatically.",
52                 flags = Flags.ExecReadOnly)
53         code = Attribute("code", 
54                 "Plain text source code to be uploaded to the server. It will be stored "
55                 "under ${SOURCES}/code",
56                 flags = Flags.ExecReadOnly)
57         build = Attribute("build", 
58                 "Build commands to execute after deploying the sources. "
59                 "Sources will be in the ${SOURCES} folder. "
60                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
61                 "Try to make the commands return with a nonzero exit code on error.\n"
62                 "Also, do not install any programs here, use the 'install' attribute. This will "
63                 "help keep the built files constrained to the build folder (which may "
64                 "not be the home folder), and will result in faster deployment. Also, "
65                 "make sure to clean up temporary files, to reduce bandwidth usage between "
66                 "nodes when transferring built packages.",
67                 flags = Flags.ReadOnly)
68         install = Attribute("install", 
69                 "Commands to transfer built files to their final destinations. "
70                 "Sources will be in the initial working folder, and a special "
71                 "tag ${SOURCES} can be used to reference the experiment's "
72                 "home folder (where the application commands will run).\n"
73                 "ALL sources and targets needed for execution must be copied there, "
74                 "if building has been enabled.\n"
75                 "That is, 'slave' nodes will not automatically get any source files. "
76                 "'slave' nodes don't get build dependencies either, so if you need "
77                 "make and other tools to install, be sure to provide them as "
78                 "actual dependencies instead.",
79                 flags = Flags.ReadOnly)
80         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
81         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
82         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
83         tear_down = Attribute("tearDown", "Bash script to be executed before "
84                 "releasing the resource", 
85                 flags = Flags.ReadOnly)
86
87         cls._register_attribute(command)
88         cls._register_attribute(forward_x11)
89         cls._register_attribute(env)
90         cls._register_attribute(sudo)
91         cls._register_attribute(depends)
92         cls._register_attribute(sources)
93         cls._register_attribute(code)
94         cls._register_attribute(build)
95         cls._register_attribute(install)
96         cls._register_attribute(stdin)
97         cls._register_attribute(stdout)
98         cls._register_attribute(stderr)
99         cls._register_attribute(tear_down)
100
101     @classmethod
102     def _register_traces(cls):
103         stdout = Trace("stdout", "Standard output stream")
104         stderr = Trace("stderr", "Standard error stream")
105
106         cls._register_trace(stdout)
107         cls._register_trace(stderr)
108
109     def __init__(self, ec, guid):
110         super(LinuxApplication, self).__init__(ec, guid)
111         self._pid = None
112         self._ppid = None
113         self._home = "app-%s" % self.guid
114
115         # keep a reference to the running process handler when 
116         # the command is not executed as remote daemon in background
117         self._proc = None
118
119         # timestamp of last state check of the application
120         self._last_state_check = strfnow()
121     
122     def log_message(self, msg):
123         return " guid %d - host %s - %s " % (self.guid, 
124                 self.node.get("hostname"), msg)
125
126     @property
127     def node(self):
128         node = self.get_connected(LinuxNode.rtype())
129         if node: return node[0]
130         return None
131
132     @property
133     def app_home(self):
134         return os.path.join(self.node.exp_home, self._home)
135
136     @property
137     def src_dir(self):
138         return os.path.join(self.app_home, 'src')
139
140     @property
141     def build_dir(self):
142         return os.path.join(self.app_home, 'build')
143
144     @property
145     def pid(self):
146         return self._pid
147
148     @property
149     def ppid(self):
150         return self._ppid
151
152     @property
153     def in_foreground(self):
154         """ Returns True if the command needs to be executed in foreground.
155         This means that command will be executed using 'execute' instead of
156         'run' ('run' executes a command in background and detached from the 
157         terminal)
158
159         When using X11 forwarding option, the command can not run in background
160         and detached from a terminal, since we need to keep the terminal attached 
161         to interact with it.
162         """
163         return self.get("forwardX11") or False
164
165     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
166         self.info("Retrieving '%s' trace %s " % (name, attr))
167
168         path = os.path.join(self.app_home, name)
169         
170         command = "(test -f %s && echo 'success') || echo 'error'" % path
171         (out, err), proc = self.node.execute(command)
172
173         if (err and proc.poll()) or out.find("error") != -1:
174             msg = " Couldn't find trace %s " % name
175             self.error(msg, out, err)
176             return None
177     
178         if attr == TraceAttr.PATH:
179             return path
180
181         if attr == TraceAttr.ALL:
182             (out, err), proc = self.node.check_output(self.app_home, name)
183             
184             if err and proc.poll():
185                 msg = " Couldn't read trace %s " % name
186                 self.error(msg, out, err)
187                 return None
188
189             return out
190
191         if attr == TraceAttr.STREAM:
192             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
193         elif attr == TraceAttr.SIZE:
194             cmd = "stat -c%%s %s " % path
195
196         (out, err), proc = self.node.execute(cmd)
197
198         if err and proc.poll():
199             msg = " Couldn't find trace %s " % name
200             self.error(msg, out, err)
201             return None
202         
203         if attr == TraceAttr.SIZE:
204             out = int(out.strip())
205
206         return out
207             
208     def provision(self):
209         # create home dir for application
210         self.node.mkdir(self.app_home)
211
212         # upload sources
213         self.upload_sources()
214
215         # upload code
216         self.upload_code()
217
218         # upload stdin
219         self.upload_stdin()
220
221         # install dependencies
222         self.install_dependencies()
223
224         # build
225         self.build()
226
227         # Install
228         self.install()
229
230         # Upload command to remote bash script
231         # - only if command can be executed in background and detached
232         command = self.get("command")
233
234         if command and not self.in_foreground:
235             self.info("Uploading command '%s'" % command)
236
237             # replace application specific paths in the command
238             command = self.replace_paths(command)
239             
240             # replace application specific paths in the environment
241             env = self.get("env")
242             env = env and self.replace_paths(env)
243
244             self.node.upload_command(command, self.app_home, 
245                     shfile = "app.sh",
246                     env = env)
247        
248         self.info("Provisioning finished")
249
250         super(LinuxApplication, self).provision()
251
252     def upload_sources(self):
253         sources = self.get("sources")
254         if sources:
255             self.info("Uploading sources ")
256
257             # create dir for sources
258             self.node.mkdir(self.src_dir)
259
260             sources = sources.split(' ')
261
262             http_sources = list()
263             for source in list(sources):
264                 if source.startswith("http") or source.startswith("https"):
265                     http_sources.append(source)
266                     sources.remove(source)
267
268             # Download http sources remotely
269             if http_sources:
270                 command = [" wget -c --directory-prefix=${SOURCES} "]
271                 check = []
272
273                 for source in http_sources:
274                     command.append(" %s " % (source))
275                     check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
276                 
277                 command = " ".join(command)
278                 check = " ; ".join(check)
279
280                 # Append the command to check that the sources were downloaded
281                 command += " ; %s " % check
282
283                 # replace application specific paths in the command
284                 command = self.replace_paths(command)
285                 
286                 # Upload the command to a bash script and run it
287                 # in background ( but wait until the command has
288                 # finished to continue )
289                 self.node.run_and_wait(command, self.app_home,
290                         shfile = "http_sources.sh",
291                         pidfile = "http_sources_pidfile", 
292                         ecodefile = "http_sources_exitcode", 
293                         stdout = "http_sources_stdout", 
294                         stderr = "http_sources_stderr")
295
296             if sources:
297                 self.node.upload(sources, self.src_dir)
298
299     def upload_code(self):
300         code = self.get("code")
301         if code:
302             # create dir for sources
303             self.node.mkdir(self.src_dir)
304
305             self.info("Uploading code ")
306
307             dst = os.path.join(self.src_dir, "code")
308             self.node.upload(sources, dst, text = True)
309
310     def upload_stdin(self):
311         stdin = self.get("stdin")
312         if stdin:
313             # create dir for sources
314             self.info(" Uploading stdin ")
315
316             dst = os.path.join(self.app_home, "stdin")
317             self.node.upload(stdin, dst, text = True)
318
319     def install_dependencies(self):
320         depends = self.get("depends")
321         if depends:
322             self.info("Installing dependencies %s" % depends)
323             self.node.install_packages(depends, self.app_home)
324
325     def build(self):
326         build = self.get("build")
327         if build:
328             self.info("Building sources ")
329             
330             # create dir for build
331             self.node.mkdir(self.build_dir)
332
333             # replace application specific paths in the command
334             command = self.replace_paths(build)
335
336             # Upload the command to a bash script and run it
337             # in background ( but wait until the command has
338             # finished to continue )
339             self.node.run_and_wait(command, self.app_home,
340                     shfile = "build.sh",
341                     pidfile = "build_pidfile", 
342                     ecodefile = "build_exitcode", 
343                     stdout = "build_stdout", 
344                     stderr = "build_stderr")
345  
346     def install(self):
347         install = self.get("install")
348         if install:
349             self.info("Installing sources ")
350
351             # replace application specific paths in the command
352             command = self.replace_paths(install)
353
354             # Upload the command to a bash script and run it
355             # in background ( but wait until the command has
356             # finished to continue )
357             self.node.run_and_wait(command, self.app_home,
358                     shfile = "install.sh",
359                     pidfile = "install_pidfile", 
360                     ecodefile = "install_exitcode", 
361                     stdout = "install_stdout", 
362                     stderr = "install_stderr")
363
364     def deploy(self):
365         # Wait until node is associated and deployed
366         node = self.node
367         if not node or node.state < ResourceState.READY:
368             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
369             
370             reschedule_delay = "0.5s"
371             self.ec.schedule(reschedule_delay, self.deploy)
372         else:
373             try:
374                 command = self.get("command") or ""
375                 self.info("Deploying command '%s' " % command)
376                 self.discover()
377                 self.provision()
378             except:
379                 self._state = ResourceState.FAILED
380                 raise
381
382             super(LinuxApplication, self).deploy()
383
384     def start(self):
385         command = self.get("command")
386
387         self.info("Starting command '%s'" % command)
388
389         if not command:
390             # If no command was given (i.e. Application was used for dependency
391             # installation), then the application is directly marked as FINISHED
392             self._state = ResourceState.FINISHED
393         else:
394             if self.in_foreground:
395                 self._start_in_foreground()
396             else:
397                 self._start_in_background()
398
399             super(LinuxApplication, self).start()
400
401     def _start_in_foreground(self):
402         command = self.get("command")
403         env = self.get("env")
404         stdin = "stdin" if self.get("stdin") else None
405         sudo = self.get("sudo") or False
406         x11 = self.get("forwardX11")
407
408         # Command will be launched in foreground and attached to the
409         # terminal using the node 'execute' in non blocking mode.
410
411         # Export environment
412         environ = self.node.format_environment(env, inline = True)
413         command = environ + command
414         command = self.replace_paths(command)
415
416         self.info("Starting command IN FOREGROUND '%s'" % command)
417         
418         # We save the reference to the process in self._proc 
419         # to be able to kill the process from the stop method.
420         # We also set blocking = False, since we don't want the
421         # thread to block until the execution finishes.
422         (out, err), self._proc = self.node.execute(command,
423                 sudo = sudo,
424                 stdin = stdin,
425                 forward_x11 = x11,
426                 blocking = False)
427
428         if self._proc.poll():
429             self._state = ResourceState.FAILED
430             self.error(msg, out, err)
431             raise RuntimeError, msg
432
433     def _start_in_background(self):
434         command = self.get("command")
435         env = self.get("env")
436         stdin = "stdin" if self.get("stdin") else None
437         stdout = "stdout" if self.get("stdout") else "stdout"
438         stderr = "stderr" if self.get("stderr") else "stderr"
439         sudo = self.get("sudo") or False
440
441         # Command will be as a daemon in baground and detached from any terminal.
442         # The real command to run was previously uploaded to a bash script
443         # during deployment, now launch the remote script using 'run'
444         # method from the node
445         cmd = "bash ./app.sh"
446         (out, err), proc = self.node.run(cmd, self.app_home, 
447             stdin = stdin, 
448             stdout = stdout,
449             stderr = stderr,
450             sudo = sudo)
451
452         # check if execution errors occurred
453         msg = " Failed to start command '%s' " % command
454         
455         if proc.poll():
456             self._state = ResourceState.FAILED
457             self.error(msg, out, err)
458             raise RuntimeError, msg
459     
460         # Wait for pid file to be generated
461         pid, ppid = self.node.wait_pid(self.app_home)
462         if pid: self._pid = int(pid)
463         if ppid: self._ppid = int(ppid)
464
465         # If the process is not running, check for error information
466         # on the remote machine
467         if not self.pid or not self.ppid:
468             (out, err), proc = self.check_errors(home, ecodefile, stderr)
469
470             # Out is what was written in the stderr file
471             if err:
472                 self._state = ResourceState.FAILED
473                 msg = " Failed to start command '%s' " % command
474                 self.error(msg, out, err)
475                 raise RuntimeError, msg
476         
477     def stop(self):
478         """ Stops application execution
479         """
480         command = self.get('command') or ''
481
482         if self.state == ResourceState.STARTED:
483             stopped = True
484
485             self.info("Stopping command '%s'" % command)
486         
487             # If the command is running in foreground (it was launched using
488             # the node 'execute' method), then we use the handler to the Popen
489             # process to kill it. Else we send a kill signal using the pid and ppid
490             # retrieved after running the command with the node 'run' method
491
492             if self._proc:
493                 self._proc.kill()
494             else:
495                 (out, err), proc = self.node.kill(self.pid, self.ppid)
496
497                 if out or err:
498                     # check if execution errors occurred
499                     msg = " Failed to STOP command '%s' " % self.get("command")
500                     self.error(msg, out, err)
501                     self._state = ResourceState.FAILED
502                     stopped = False
503
504             if stopped:
505                 super(LinuxApplication, self).stop()
506
507     def release(self):
508         self.info("Releasing resource")
509
510         tear_down = self.get("tearDown")
511         if tear_down:
512             self.node.execute(tear_down)
513
514         self.stop()
515
516         if self.state == ResourceState.STOPPED:
517             super(LinuxApplication, self).release()
518     
519     @property
520     def state(self):
521         """ Returns the state of the application
522         """
523         if self._state == ResourceState.STARTED:
524             if self.in_foreground:
525                 # Check if the process we used to execute the command
526                 # is still running ...
527                 retcode = self._proc.poll()
528                 
529                 # retcode == None -> running
530                 # retcode > 0 -> error
531                 # retcode == 0 -> finished
532                 if retcode:
533                     out = ""
534                     msg = " Failed to execute command '%s'" % self.get("command")
535                     err = self._proc.stderr.read()
536                     self.error(msg, out, err)
537                     self._state = ResourceState.FAILED
538                 elif retcode == 0:
539                     self._state = ResourceState.FINISHED
540
541             else:
542                 # We need to query the status of the command we launched in 
543                 # background. In oredr to avoid overwhelming the remote host and
544                 # the local processor with too many ssh queries, the state is only
545                 # requested every 'state_check_delay' seconds.
546                 state_check_delay = 0.5
547                 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
548                     # check if execution errors occurred
549                     (out, err), proc = self.node.check_errors(self.app_home)
550
551                     if err:
552                         msg = " Failed to execute command '%s'" % self.get("command")
553                         self.error(msg, out, err)
554                         self._state = ResourceState.FAILED
555
556                     elif self.pid and self.ppid:
557                         # No execution errors occurred. Make sure the background
558                         # process with the recorded pid is still running.
559                         status = self.node.status(self.pid, self.ppid)
560
561                         if status == ProcStatus.FINISHED:
562                             self._state = ResourceState.FINISHED
563
564                     self._last_state_check = strfnow()
565
566         return self._state
567
568     def replace_paths(self, command):
569         """
570         Replace all special path tags with shell-escaped actual paths.
571         """
572         def absolute_dir(d):
573             return d if d.startswith("/") else os.path.join("${HOME}", d)
574
575         return ( command
576             .replace("${SOURCES}", absolute_dir(self.src_dir))
577             .replace("${BUILD}", absolute_dir(self.build_dir))
578             .replace("${APP_HOME}", absolute_dir(self.app_home))
579             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
580             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
581             )
582         
583     def valid_connection(self, guid):
584         # TODO: Validate!
585         return True
586