Bugfixes for LinuxApplication and LinuxNode
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import strfnow, strfdiff
26
27 import os
28
29 # TODO: Resolve wildcards in commands!!
30
31
32 @clsinit
33 class LinuxApplication(ResourceManager):
34     _rtype = "LinuxApplication"
35
36     @classmethod
37     def _register_attributes(cls):
38         command = Attribute("command", "Command to execute", 
39                 flags = Flags.ExecReadOnly)
40         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
41                 flags = Flags.ExecReadOnly)
42         env = Attribute("env", "Environment variables string for command execution",
43                 flags = Flags.ExecReadOnly)
44         sudo = Attribute("sudo", "Run with root privileges", 
45                 flags = Flags.ExecReadOnly)
46         depends = Attribute("depends", 
47                 "Space-separated list of packages required to run the application",
48                 flags = Flags.ExecReadOnly)
49         sources = Attribute("sources", 
50                 "Space-separated list of regular files to be deployed in the working "
51                 "path prior to building. Archives won't be expanded automatically.",
52                 flags = Flags.ExecReadOnly)
53         code = Attribute("code", 
54                 "Plain text source code to be uploaded to the server. It will be stored "
55                 "under ${SOURCES}/code",
56                 flags = Flags.ExecReadOnly)
57         build = Attribute("build", 
58                 "Build commands to execute after deploying the sources. "
59                 "Sources will be in the ${SOURCES} folder. "
60                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
61                 "Try to make the commands return with a nonzero exit code on error.\n"
62                 "Also, do not install any programs here, use the 'install' attribute. This will "
63                 "help keep the built files constrained to the build folder (which may "
64                 "not be the home folder), and will result in faster deployment. Also, "
65                 "make sure to clean up temporary files, to reduce bandwidth usage between "
66                 "nodes when transferring built packages.",
67                 flags = Flags.ReadOnly)
68         install = Attribute("install", 
69                 "Commands to transfer built files to their final destinations. "
70                 "Sources will be in the initial working folder, and a special "
71                 "tag ${SOURCES} can be used to reference the experiment's "
72                 "home folder (where the application commands will run).\n"
73                 "ALL sources and targets needed for execution must be copied there, "
74                 "if building has been enabled.\n"
75                 "That is, 'slave' nodes will not automatically get any source files. "
76                 "'slave' nodes don't get build dependencies either, so if you need "
77                 "make and other tools to install, be sure to provide them as "
78                 "actual dependencies instead.",
79                 flags = Flags.ReadOnly)
80         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
81         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
82         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
83         tear_down = Attribute("tearDown", "Bash script to be executed before "
84                 "releasing the resource", 
85                 flags = Flags.ReadOnly)
86
87         cls._register_attribute(command)
88         cls._register_attribute(forward_x11)
89         cls._register_attribute(env)
90         cls._register_attribute(sudo)
91         cls._register_attribute(depends)
92         cls._register_attribute(sources)
93         cls._register_attribute(code)
94         cls._register_attribute(build)
95         cls._register_attribute(install)
96         cls._register_attribute(stdin)
97         cls._register_attribute(stdout)
98         cls._register_attribute(stderr)
99         cls._register_attribute(tear_down)
100
101     @classmethod
102     def _register_traces(cls):
103         stdout = Trace("stdout", "Standard output stream")
104         stderr = Trace("stderr", "Standard error stream")
105
106         cls._register_trace(stdout)
107         cls._register_trace(stderr)
108
109     def __init__(self, ec, guid):
110         super(LinuxApplication, self).__init__(ec, guid)
111         self._pid = None
112         self._ppid = None
113         self._home = "app-%s" % self.guid
114
115         # timestamp of last state check of the application
116         self._last_state_check = strfnow()
117     
118     def log_message(self, msg):
119         return " guid %d - host %s - %s " % (self.guid, 
120                 self.node.get("hostname"), msg)
121
122     @property
123     def node(self):
124         node = self.get_connected(LinuxNode.rtype())
125         if node: return node[0]
126         return None
127
128     @property
129     def app_home(self):
130         return os.path.join(self.node.exp_home, self._home)
131
132     @property
133     def src_dir(self):
134         return os.path.join(self.app_home, 'src')
135
136     @property
137     def build_dir(self):
138         return os.path.join(self.app_home, 'build')
139
140     @property
141     def pid(self):
142         return self._pid
143
144     @property
145     def ppid(self):
146         return self._ppid
147
148     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
149         self.info("Retrieving '%s' trace %s " % (name, attr))
150
151         path = os.path.join(self.app_home, name)
152         
153         command = "(test -f %s && echo 'success') || echo 'error'" % path
154         (out, err), proc = self.node.execute(command)
155
156         if (err and proc.poll()) or out.find("error") != -1:
157             msg = " Couldn't find trace %s " % name
158             self.error(msg, out, err)
159             return None
160     
161         if attr == TraceAttr.PATH:
162             return path
163
164         if attr == TraceAttr.ALL:
165             (out, err), proc = self.node.check_output(self.app_home, name)
166             
167             if err and proc.poll():
168                 msg = " Couldn't read trace %s " % name
169                 self.error(msg, out, err)
170                 return None
171
172             return out
173
174         if attr == TraceAttr.STREAM:
175             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
176         elif attr == TraceAttr.SIZE:
177             cmd = "stat -c%%s %s " % path
178
179         (out, err), proc = self.node.execute(cmd)
180
181         if err and proc.poll():
182             msg = " Couldn't find trace %s " % name
183             self.error(msg, out, err)
184             return None
185         
186         if attr == TraceAttr.SIZE:
187             out = int(out.strip())
188
189         return out
190             
191     def provision(self):
192         # create home dir for application
193         self.node.mkdir(self.app_home)
194
195         # upload sources
196         self.upload_sources()
197
198         # upload code
199         self.upload_code()
200
201         # upload stdin
202         self.upload_stdin()
203
204         # install dependencies
205         self.install_dependencies()
206
207         # build
208         self.build()
209
210         # Install
211         self.install()
212
213         # Upload command
214         command = self.get("command")
215         x11 = self.get("forwardX11")
216         env = self.get("env")
217         
218         if command and not x11:
219             self.info("Uploading command '%s'" % command)
220
221             # replace application specific paths in the command
222             command = self.replace_paths(command)
223             env = env and self.replace_paths(env)
224
225             self.node.upload_command(command, self.app_home, 
226                     shfile = "app.sh",
227                     env = env)
228        
229         super(LinuxApplication, self).provision()
230
231     def upload_sources(self):
232         # TODO: check if sources need to be uploaded and upload them
233         sources = self.get("sources")
234         if sources:
235             self.info(" Uploading sources ")
236
237             # create dir for sources
238             self.node.mkdir(self.src_dir)
239
240             sources = sources.split(' ')
241
242             http_sources = list()
243             for source in list(sources):
244                 if source.startswith("http") or source.startswith("https"):
245                     http_sources.append(source)
246                     sources.remove(source)
247
248             # Download http sources remotely
249             if http_sources:
250                 command = [" wget -c --directory-prefix=${SOURCES} "]
251                 check = []
252
253                 for source in http_sources:
254                     command.append(" %s " % (source))
255                     check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
256                 
257                 command = " ".join(command)
258                 check = " ; ".join(check)
259
260                 # Append the command to check that the sources were downloaded
261                 command += " ; %s " % check
262
263                 # replace application specific paths in the command
264                 command = self.replace_paths(command)
265                 
266                 # Upload the command to a file, and execute asynchronously
267                 self.node.run_and_wait(command, self.app_home,
268                         shfile = "http_sources.sh",
269                         pidfile = "http_sources_pidfile", 
270                         ecodefile = "http_sources_exitcode", 
271                         stdout = "http_sources_stdout", 
272                         stderr = "http_sources_stderr")
273
274             if sources:
275                 self.node.upload(sources, self.src_dir)
276
277     def upload_code(self):
278         code = self.get("code")
279         if code:
280             # create dir for sources
281             self.node.mkdir(self.src_dir)
282
283             self.info(" Uploading code ")
284
285             dst = os.path.join(self.src_dir, "code")
286             self.node.upload(sources, dst, text = True)
287
288     def upload_stdin(self):
289         stdin = self.get("stdin")
290         if stdin:
291             # create dir for sources
292             self.info(" Uploading stdin ")
293
294             dst = os.path.join(self.app_home, "stdin")
295             self.node.upload(stdin, dst, text = True)
296
297     def install_dependencies(self):
298         depends = self.get("depends")
299         if depends:
300             self.info(" Installing dependencies %s" % depends)
301             self.node.install_packages(depends, self.app_home)
302
303     def build(self):
304         build = self.get("build")
305         if build:
306             self.info(" Building sources ")
307             
308             # create dir for build
309             self.node.mkdir(self.build_dir)
310
311             # replace application specific paths in the command
312             command = self.replace_paths(build)
313
314             # Upload the command to a file, and execute asynchronously
315             self.node.run_and_wait(command, self.app_home,
316                     shfile = "build.sh",
317                     pidfile = "build_pidfile", 
318                     ecodefile = "build_exitcode", 
319                     stdout = "build_stdout", 
320                     stderr = "build_stderr")
321  
322     def install(self):
323         install = self.get("install")
324         if install:
325             self.info(" Installing sources ")
326
327             # replace application specific paths in the command
328             command = self.replace_paths(install)
329
330             # Upload the command to a file, and execute asynchronously
331             self.node.run_and_wait(command, self.app_home,
332                     shfile = "install.sh",
333                     pidfile = "install_pidfile", 
334                     ecodefile = "install_exitcode", 
335                     stdout = "install_stdout", 
336                     stderr = "install_stderr")
337
338     def deploy(self):
339         # Wait until node is associated and deployed
340         node = self.node
341         if not node or node.state < ResourceState.READY:
342             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
343             
344             reschedule_delay = "0.5s"
345             self.ec.schedule(reschedule_delay, self.deploy)
346         else:
347             try:
348                 command = self.get("command") or ""
349                 self.info("Deploying command '%s' " % command)
350                 self.discover()
351                 self.provision()
352             except:
353                 self._state = ResourceState.FAILED
354                 raise
355
356             super(LinuxApplication, self).deploy()
357
358     def start(self):
359         command = self.get('command')
360         env = self.get('env')
361         stdin = 'stdin' if self.get('stdin') else None
362         stdout = 'stdout' if self.get('stdout') else 'stdout'
363         stderr = 'stderr' if self.get('stderr') else 'stderr'
364         sudo = self.get('sudo') or False
365         x11 = self.get('forwardX11') or False
366         failed = False
367
368         if not command:
369             # If no command was given, then the application 
370             # is directly marked as FINISHED
371             self._state = ResourceState.FINISHED
372         else:
373             super(LinuxApplication, self).start()
374     
375         self.info("Starting command '%s'" % command)
376
377         if x11:
378             # If X11 forwarding was specified, then the application
379             # can not run detached, so instead of invoking asynchronous
380             # 'run' we invoke synchronous 'execute'.
381             if not command:
382                 msg = "No command is defined but X11 forwarding has been set"
383                 self.error(msg)
384                 self._state = ResourceState.FAILED
385                 raise RuntimeError, msg
386
387             if env:
388                 # Export environment
389                 environ = ""
390                 for var in env.split(" "):
391                     environ += ' %s ' % var
392
393                 command = "{" + environ + " ; " + command + " ; }"
394                 command = self.replace_paths(command)
395
396             # If the command requires X11 forwarding, we
397             # can't run it asynchronously
398             (out, err), proc = self.node.execute(command,
399                     sudo = sudo,
400                     stdin = stdin,
401                     forward_x11 = x11)
402
403             self._state = ResourceState.FINISHED
404
405             if proc.poll() and err:
406                 failed = True
407         else:
408             # Command was  previously uploaded, now run the remote
409             # bash file asynchronously
410             cmd = "bash ./app.sh"
411             (out, err), proc = self.node.run(cmd, self.app_home, 
412                 stdin = stdin, 
413                 stdout = stdout,
414                 stderr = stderr,
415                 sudo = sudo)
416
417             # check if execution errors occurred
418             msg = " Failed to start command '%s' " % command
419             
420             if proc.poll() and err:
421                 self.error(msg, out, err)
422                 raise RuntimeError, msg
423         
424             # Check status of process running in background
425             pid, ppid = self.node.wait_pid(self.app_home)
426             if pid: self._pid = int(pid)
427             if ppid: self._ppid = int(ppid)
428
429             # If the process is not running, check for error information
430             # on the remote machine
431             if not self.pid or not self.ppid:
432                 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
433                 self.error(msg, out, err)
434
435                 msg2 = " Setting state to Failed"
436                 self.debug(msg2)
437                 self._state = ResourceState.FAILED
438
439                 raise RuntimeError, msg
440
441     def stop(self):
442         command = self.get('command') or ''
443         state = self.state
444         
445         if state == ResourceState.STARTED:
446             self.info("Stopping command '%s'" % command)
447
448             (out, err), proc = self.node.kill(self.pid, self.ppid)
449
450             if out or err:
451                 # check if execution errors occurred
452                 msg = " Failed to STOP command '%s' " % self.get("command")
453                 self.error(msg, out, err)
454                 self._state = ResourceState.FAILED
455                 stopped = False
456             else:
457                 super(LinuxApplication, self).stop()
458
459     def release(self):
460         self.info("Releasing resource")
461
462         tear_down = self.get("tearDown")
463         if tear_down:
464             self.node.execute(tear_down)
465
466         self.stop()
467         if self.state == ResourceState.STOPPED:
468             super(LinuxApplication, self).release()
469     
470     @property
471     def state(self):
472         if self._state == ResourceState.STARTED:
473             # To avoid overwhelming the remote hosts and the local processor
474             # with too many ssh queries, the state is only requested
475             # every 'state_check_delay' seconds.
476             state_check_delay = 0.5
477             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
478                 # check if execution errors occurred
479                 (out, err), proc = self.node.check_errors(self.app_home)
480
481                 if out or err:
482                     if err.find("No such file or directory") >= 0 :
483                         # The resource is marked as started, but the
484                         # command was not yet executed
485                         return ResourceState.READY
486
487                     msg = " Failed to execute command '%s'" % self.get("command")
488                     self.error(msg, out, err)
489                     self._state = ResourceState.FAILED
490
491                 elif self.pid and self.ppid:
492                     status = self.node.status(self.pid, self.ppid)
493
494                     if status == ProcStatus.FINISHED:
495                         self._state = ResourceState.FINISHED
496
497
498                 self._last_state_check = strfnow()
499
500         return self._state
501
502     def replace_paths(self, command):
503         """
504         Replace all special path tags with shell-escaped actual paths.
505         """
506         def absolute_dir(d):
507             return d if d.startswith("/") else os.path.join("${HOME}", d)
508
509         return ( command
510             .replace("${SOURCES}", absolute_dir(self.src_dir))
511             .replace("${BUILD}", absolute_dir(self.build_dir))
512             .replace("${APP_HOME}", absolute_dir(self.app_home))
513             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
514             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
515             )
516         
517     def valid_connection(self, guid):
518         # TODO: Validate!
519         return True
520