Bugfixing LinuxNode and LinuxApplication
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import strfnow, strfdiff
26
27 import os
28
29 # TODO: Resolve wildcards in commands!!
30
31
32 @clsinit
33 class LinuxApplication(ResourceManager):
34     _rtype = "LinuxApplication"
35
36     @classmethod
37     def _register_attributes(cls):
38         command = Attribute("command", "Command to execute", 
39                 flags = Flags.ExecReadOnly)
40         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
41                 flags = Flags.ExecReadOnly)
42         env = Attribute("env", "Environment variables string for command execution",
43                 flags = Flags.ExecReadOnly)
44         sudo = Attribute("sudo", "Run with root privileges", 
45                 flags = Flags.ExecReadOnly)
46         depends = Attribute("depends", 
47                 "Space-separated list of packages required to run the application",
48                 flags = Flags.ExecReadOnly)
49         sources = Attribute("sources", 
50                 "Space-separated list of regular files to be deployed in the working "
51                 "path prior to building. Archives won't be expanded automatically.",
52                 flags = Flags.ExecReadOnly)
53         code = Attribute("code", 
54                 "Plain text source code to be uploaded to the server. It will be stored "
55                 "under ${SOURCES}/code",
56                 flags = Flags.ExecReadOnly)
57         build = Attribute("build", 
58                 "Build commands to execute after deploying the sources. "
59                 "Sources will be in the ${SOURCES} folder. "
60                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
61                 "Try to make the commands return with a nonzero exit code on error.\n"
62                 "Also, do not install any programs here, use the 'install' attribute. This will "
63                 "help keep the built files constrained to the build folder (which may "
64                 "not be the home folder), and will result in faster deployment. Also, "
65                 "make sure to clean up temporary files, to reduce bandwidth usage between "
66                 "nodes when transferring built packages.",
67                 flags = Flags.ReadOnly)
68         install = Attribute("install", 
69                 "Commands to transfer built files to their final destinations. "
70                 "Sources will be in the initial working folder, and a special "
71                 "tag ${SOURCES} can be used to reference the experiment's "
72                 "home folder (where the application commands will run).\n"
73                 "ALL sources and targets needed for execution must be copied there, "
74                 "if building has been enabled.\n"
75                 "That is, 'slave' nodes will not automatically get any source files. "
76                 "'slave' nodes don't get build dependencies either, so if you need "
77                 "make and other tools to install, be sure to provide them as "
78                 "actual dependencies instead.",
79                 flags = Flags.ReadOnly)
80         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
81         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
82         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
83         tear_down = Attribute("tearDown", "Bash script to be executed before "
84                 "releasing the resource", 
85                 flags = Flags.ReadOnly)
86
87         cls._register_attribute(command)
88         cls._register_attribute(forward_x11)
89         cls._register_attribute(env)
90         cls._register_attribute(sudo)
91         cls._register_attribute(depends)
92         cls._register_attribute(sources)
93         cls._register_attribute(code)
94         cls._register_attribute(build)
95         cls._register_attribute(install)
96         cls._register_attribute(stdin)
97         cls._register_attribute(stdout)
98         cls._register_attribute(stderr)
99         cls._register_attribute(tear_down)
100
101     @classmethod
102     def _register_traces(cls):
103         stdout = Trace("stdout", "Standard output stream")
104         stderr = Trace("stderr", "Standard error stream")
105         buildlog = Trace("buildlog", "Output of the build process")
106
107         cls._register_trace(stdout)
108         cls._register_trace(stderr)
109         cls._register_trace(buildlog)
110
111     def __init__(self, ec, guid):
112         super(LinuxApplication, self).__init__(ec, guid)
113         self._pid = None
114         self._ppid = None
115         self._home = "app-%s" % self.guid
116
117         # timestamp of last state check of the application
118         self._last_state_check = strfnow()
119     
120     def log_message(self, msg):
121         return " guid %d - host %s - %s " % (self.guid, 
122                 self.node.get("hostname"), msg)
123
124     @property
125     def node(self):
126         node = self.get_connected(LinuxNode.rtype())
127         if node: return node[0]
128         return None
129
130     @property
131     def app_home(self):
132         return os.path.join(self.node.exp_home, self._home)
133
134     @property
135     def src_dir(self):
136         return os.path.join(self.app_home, 'src')
137
138     @property
139     def build_dir(self):
140         return os.path.join(self.app_home, 'build')
141
142     @property
143     def pid(self):
144         return self._pid
145
146     @property
147     def ppid(self):
148         return self._ppid
149
150     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
151         self.info("Retrieving '%s' trace %s " % (name, attr))
152
153         path = os.path.join(self.app_home, name)
154         
155         command = "(test -f %s && echo 'success') || echo 'error'" % path
156         (out, err), proc = self.node.execute(command)
157
158         if (err and proc.poll()) or out.find("error") != -1:
159             msg = " Couldn't find trace %s " % name
160             self.error(msg, out, err)
161             return None
162     
163         if attr == TraceAttr.PATH:
164             return path
165
166         if attr == TraceAttr.ALL:
167             (out, err), proc = self.node.check_output(self.app_home, name)
168             
169             if err and proc.poll():
170                 msg = " Couldn't read trace %s " % name
171                 self.error(msg, out, err)
172                 return None
173
174             return out
175
176         if attr == TraceAttr.STREAM:
177             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
178         elif attr == TraceAttr.SIZE:
179             cmd = "stat -c%%s %s " % path
180
181         (out, err), proc = self.node.execute(cmd)
182
183         if err and proc.poll():
184             msg = " Couldn't find trace %s " % name
185             self.error(msg, out, err)
186             return None
187         
188         if attr == TraceAttr.SIZE:
189             out = int(out.strip())
190
191         return out
192             
193     def provision(self):
194         # create home dir for application
195         self.node.mkdir(self.app_home)
196
197         # upload sources
198         self.upload_sources()
199
200         # upload code
201         self.upload_code()
202
203         # upload stdin
204         self.upload_stdin()
205
206         # install dependencies
207         self.install_dependencies()
208
209         # build
210         self.build()
211
212         # Install
213         self.install()
214
215         # Upload command
216         command = self.get("command")
217         x11 = self.get("forwardX11")
218         env = self.get("env")
219         
220         if command and not x11:
221             self.info("Uploading command '%s'" % command)
222
223             # replace application specific paths in the command
224             command = self.replace_paths(command)
225
226             self.node.upload_command(command, self.app_home, 
227                     shfile = "app.sh",
228                     env = env)
229        
230         super(LinuxApplication, self).provision()
231
232     def upload_sources(self):
233         # TODO: check if sources need to be uploaded and upload them
234         sources = self.get("sources")
235         if sources:
236             self.info(" Uploading sources ")
237
238             # create dir for sources
239             self.node.mkdir(self.src_dir)
240
241             sources = sources.split(' ')
242
243             http_sources = list()
244             for source in list(sources):
245                 if source.startswith("http") or source.startswith("https"):
246                     http_sources.append(source)
247                     sources.remove(source)
248
249             # Download http sources remotely
250             if http_sources:
251                 command = " wget -c --directory-prefix=${SOURCES} "
252                 check = ""
253
254                 for source in http_sources:
255                     command += " %s " % (source)
256                     check += " ls ${SOURCES}/%s ;" % os.path.basename(source)
257                 
258                 # Append the command to check that the sources were downloaded
259                 command += " ; %s " % check
260
261                 # replace application specific paths in the command
262                 command = self.replace_paths(command)
263                 
264                 # Upload the command to a file, and execute asynchronously
265                 self.node.run_and_wait(command, self.app_home,
266                         shfile = "http_sources.sh",
267                         pidfile = "http_sources_pidfile", 
268                         ecodefile = "http_sources_exitcode", 
269                         stdout = "http_sources_stdout", 
270                         stderr = "http_sources_stderr")
271
272             if sources:
273                 self.node.upload(sources, self.src_dir)
274
275     def upload_code(self):
276         code = self.get("code")
277         if code:
278             # create dir for sources
279             self.node.mkdir(self.src_dir)
280
281             self.info(" Uploading code ")
282
283             dst = os.path.join(self.src_dir, "code")
284             self.node.upload(sources, dst, text = True)
285
286     def upload_stdin(self):
287         stdin = self.get("stdin")
288         if stdin:
289             # create dir for sources
290             self.info(" Uploading stdin ")
291
292             dst = os.path.join(self.app_home, "stdin")
293             self.node.upload(stdin, dst, text = True)
294
295     def install_dependencies(self):
296         depends = self.get("depends")
297         if depends:
298             self.info(" Installing dependencies %s" % depends)
299             self.node.install_packages(depends, self.app_home)
300
301     def build(self):
302         build = self.get("build")
303         if build:
304             self.info(" Building sources ")
305             
306             # create dir for build
307             self.node.mkdir(self.build_dir)
308
309             # replace application specific paths in the command
310             command = self.replace_paths(command)
311
312             # Upload the command to a file, and execute asynchronously
313             self.node.run_and_wait(command, self.app_home,
314                     shfile = "build.sh",
315                     pidfile = "build_pidfile", 
316                     ecodefile = "build_exitcode", 
317                     stdout = "build_stdout", 
318                     stderr = "build_stderr")
319  
320     def install(self):
321         install = self.get("install")
322         if install:
323             self.info(" Installing sources ")
324
325             # replace application specific paths in the command
326             command = self.replace_paths(command)
327
328             # Upload the command to a file, and execute asynchronously
329             self.node.run_and_wait(command, self.app_home,
330                     shfile = "install.sh",
331                     pidfile = "install_pidfile", 
332                     ecodefile = "install_exitcode", 
333                     stdout = "install_stdout", 
334                     stderr = "install_stderr")
335
336     def deploy(self):
337         # Wait until node is associated and deployed
338         node = self.node
339         if not node or node.state < ResourceState.READY:
340             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
341             
342             reschedule_delay = "0.5s"
343             self.ec.schedule(reschedule_delay, self.deploy)
344         else:
345             try:
346                 command = self.get("command") or ""
347                 self.info("Deploying command '%s' " % command)
348                 self.discover()
349                 self.provision()
350             except:
351                 self._state = ResourceState.FAILED
352                 raise
353
354             super(LinuxApplication, self).deploy()
355
356     def start(self):
357         command = self.get('command')
358         env = self.get('env')
359         stdin = 'stdin' if self.get('stdin') else None
360         stdout = 'stdout' if self.get('stdout') else 'stdout'
361         stderr = 'stderr' if self.get('stderr') else 'stderr'
362         sudo = self.get('sudo') or False
363         x11 = self.get('forwardX11') or False
364         failed = False
365
366         if not command:
367             # If no command was given, then the application 
368             # is directly marked as FINISHED
369             self._state = ResourceState.FINISHED
370         else:
371             super(LinuxApplication, self).start()
372     
373         self.info("Starting command '%s'" % command)
374
375         if x11:
376             # If X11 forwarding was specified, then the application
377             # can not run detached, so instead of invoking asynchronous
378             # 'run' we invoke synchronous 'execute'.
379             if not command:
380                 msg = "No command is defined but X11 forwarding has been set"
381                 self.error(msg)
382                 self._state = ResourceState.FAILED
383                 raise RuntimeError, msg
384
385             if env:
386                 # Export environment
387                 environ = ""
388                 for var in env.split(" "):
389                     environ += ' %s ' % var
390
391                 command = "(" + environ + " ; " + command + ")"
392                 command = self.replace_paths(command)
393
394             # If the command requires X11 forwarding, we
395             # can't run it asynchronously
396             (out, err), proc = self.node.execute(command,
397                     sudo = sudo,
398                     stdin = stdin,
399                     forward_x11 = x11)
400
401             self._state = ResourceState.FINISHED
402
403             if proc.poll() and err:
404                 failed = True
405         else:
406             # Command was  previously uploaded, now run the remote
407             # bash file asynchronously
408             cmd = "bash ./app.sh"
409             (out, err), proc = self.node.run(cmd, self.app_home, 
410                 stdin = stdin, 
411                 stdout = stdout,
412                 stderr = stderr,
413                 sudo = sudo)
414
415             # check if execution errors occurred
416             msg = " Failed to start command '%s' " % command
417             
418             if proc.poll() and err:
419                 self.error(msg, out, err)
420                 raise RuntimeError, msg
421         
422             # Check status of process running in background
423             pid, ppid = self.node.wait_pid(self.app_home)
424             if pid: self._pid = int(pid)
425             if ppid: self._ppid = int(ppid)
426
427             # If the process is not running, check for error information
428             # on the remote machine
429             if not self.pid or not self.ppid:
430                 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
431                 self.error(msg, out, err)
432
433                 msg2 = " Setting state to Failed"
434                 self.debug(msg2)
435                 self._state = ResourceState.FAILED
436
437                 raise RuntimeError, msg
438
439     def stop(self):
440         command = self.get('command') or ''
441         state = self.state
442         
443         if state == ResourceState.STARTED:
444             self.info("Stopping command '%s'" % command)
445
446             (out, err), proc = self.node.kill(self.pid, self.ppid)
447
448             if out or err:
449                 # check if execution errors occurred
450                 msg = " Failed to STOP command '%s' " % self.get("command")
451                 self.error(msg, out, err)
452                 self._state = ResourceState.FAILED
453                 stopped = False
454             else:
455                 super(LinuxApplication, self).stop()
456
457     def release(self):
458         self.info("Releasing resource")
459
460         tear_down = self.get("tearDown")
461         if tear_down:
462             self.node.execute(tear_down)
463
464         self.stop()
465         if self.state == ResourceState.STOPPED:
466             super(LinuxApplication, self).release()
467     
468     @property
469     def state(self):
470         if self._state == ResourceState.STARTED:
471             # To avoid overwhelming the remote hosts and the local processor
472             # with too many ssh queries, the state is only requested
473             # every 'state_check_delay' seconds.
474             state_check_delay = 0.5
475             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
476                 # check if execution errors occurred
477                 (out, err), proc = self.node.check_errors(self.app_home)
478
479                 if out or err:
480                     if err.find("No such file or directory") >= 0 :
481                         # The resource is marked as started, but the
482                         # command was not yet executed
483                         return ResourceState.READY
484
485                     msg = " Failed to execute command '%s'" % self.get("command")
486                     self.error(msg, out, err)
487                     self._state = ResourceState.FAILED
488
489                 elif self.pid and self.ppid:
490                     status = self.node.status(self.pid, self.ppid)
491
492                     if status == ProcStatus.FINISHED:
493                         self._state = ResourceState.FINISHED
494
495
496                 self._last_state_check = strfnow()
497
498         return self._state
499
500     def replace_paths(self, command):
501         """
502         Replace all special path tags with shell-escaped actual paths.
503         """
504         def absolute_dir(d):
505             return d if d.startswith("/") else os.path.join("${HOME}", d)
506
507         return ( command
508             .replace("${SOURCES}", absolute_dir(self.src_dir))
509             .replace("${BUILD}", absolute_dir(self.build_dir))
510             .replace("${APP_HOME}", absolute_dir(self.app_home))
511             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
512             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
513             )
514         
515     def valid_connection(self, guid):
516         # TODO: Validate!
517         return True
518         # XXX: What if it is connected to more than one node?
519         resources = self.find_resources(exact_tags = [tags.NODE])
520         self._node = resources[0] if len(resources) == 1 else None
521         return self._node
522