Adding authors and correcting licence information
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util import sshfuncs 
25 from nepi.util.timefuncs import strfnow, strfdiff
26
27 import os
28
29 reschedule_delay = "0.5s"
30 state_check_delay = 1
31
32 # TODO: Resolve wildcards in commands!!
33 # TODO: If command is not set give a warning but do not generate an error!
34
35 @clsinit
36 class LinuxApplication(ResourceManager):
37     _rtype = "LinuxApplication"
38
39     @classmethod
40     def _register_attributes(cls):
41         command = Attribute("command", "Command to execute", 
42                 flags = Flags.ExecReadOnly)
43         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
44                 flags = Flags.ExecReadOnly)
45         env = Attribute("env", "Environment variables string for command execution",
46                 flags = Flags.ExecReadOnly)
47         sudo = Attribute("sudo", "Run with root privileges", 
48                 flags = Flags.ExecReadOnly)
49         depends = Attribute("depends", 
50                 "Space-separated list of packages required to run the application",
51                 flags = Flags.ExecReadOnly)
52         sources = Attribute("sources", 
53                 "Space-separated list of regular files to be deployed in the working "
54                 "path prior to building. Archives won't be expanded automatically.",
55                 flags = Flags.ExecReadOnly)
56         code = Attribute("code", 
57                 "Plain text source code to be uploaded to the server. It will be stored "
58                 "under ${SOURCES}/code",
59                 flags = Flags.ExecReadOnly)
60         build = Attribute("build", 
61                 "Build commands to execute after deploying the sources. "
62                 "Sources will be in the ${SOURCES} folder. "
63                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
64                 "Try to make the commands return with a nonzero exit code on error.\n"
65                 "Also, do not install any programs here, use the 'install' attribute. This will "
66                 "help keep the built files constrained to the build folder (which may "
67                 "not be the home folder), and will result in faster deployment. Also, "
68                 "make sure to clean up temporary files, to reduce bandwidth usage between "
69                 "nodes when transferring built packages.",
70                 flags = Flags.ReadOnly)
71         install = Attribute("install", 
72                 "Commands to transfer built files to their final destinations. "
73                 "Sources will be in the initial working folder, and a special "
74                 "tag ${SOURCES} can be used to reference the experiment's "
75                 "home folder (where the application commands will run).\n"
76                 "ALL sources and targets needed for execution must be copied there, "
77                 "if building has been enabled.\n"
78                 "That is, 'slave' nodes will not automatically get any source files. "
79                 "'slave' nodes don't get build dependencies either, so if you need "
80                 "make and other tools to install, be sure to provide them as "
81                 "actual dependencies instead.",
82                 flags = Flags.ReadOnly)
83         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
84         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
85         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
86         tear_down = Attribute("tearDown", "Bash script to be executed before "
87                 "releasing the resource", 
88                 flags = Flags.ReadOnly)
89
90         cls._register_attribute(command)
91         cls._register_attribute(forward_x11)
92         cls._register_attribute(env)
93         cls._register_attribute(sudo)
94         cls._register_attribute(depends)
95         cls._register_attribute(sources)
96         cls._register_attribute(code)
97         cls._register_attribute(build)
98         cls._register_attribute(install)
99         cls._register_attribute(stdin)
100         cls._register_attribute(stdout)
101         cls._register_attribute(stderr)
102         cls._register_attribute(tear_down)
103
104     @classmethod
105     def _register_traces(cls):
106         stdout = Trace("stdout", "Standard output stream")
107         stderr = Trace("stderr", "Standard error stream")
108         buildlog = Trace("buildlog", "Output of the build process")
109
110         cls._register_trace(stdout)
111         cls._register_trace(stderr)
112         cls._register_trace(buildlog)
113
114     def __init__(self, ec, guid):
115         super(LinuxApplication, self).__init__(ec, guid)
116         self._pid = None
117         self._ppid = None
118         self._home = "app-%s" % self.guid
119
120         # timestamp of last state check of the application
121         self._last_state_check = strfnow()
122     
123     def log_message(self, msg):
124         return " guid %d - host %s - %s " % (self.guid, 
125                 self.node.get("hostname"), msg)
126
127     @property
128     def node(self):
129         node = self.get_connected(LinuxNode.rtype())
130         if node: return node[0]
131         return None
132
133     @property
134     def app_home(self):
135         return os.path.join(self.node.exp_home, self._home)
136
137     @property
138     def src_dir(self):
139         return os.path.join(self.app_home, 'src')
140
141     @property
142     def build_dir(self):
143         return os.path.join(self.app_home, 'build')
144
145     @property
146     def pid(self):
147         return self._pid
148
149     @property
150     def ppid(self):
151         return self._ppid
152
153     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
154         self.info("Retrieving '%s' trace %s " % (name, attr))
155
156         path = os.path.join(self.app_home, name)
157         
158         command = "(test -f %s && echo 'success') || echo 'error'" % path
159         (out, err), proc = self.node.execute(command)
160
161         if (err and proc.poll()) or out.find("error") != -1:
162             msg = " Couldn't find trace %s " % name
163             self.error(msg, out, err)
164             return None
165     
166         if attr == TraceAttr.PATH:
167             return path
168
169         if attr == TraceAttr.ALL:
170             (out, err), proc = self.node.check_output(self.app_home, name)
171             
172             if err and proc.poll():
173                 msg = " Couldn't read trace %s " % name
174                 self.error(msg, out, err)
175                 return None
176
177             return out
178
179         if attr == TraceAttr.STREAM:
180             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
181         elif attr == TraceAttr.SIZE:
182             cmd = "stat -c%%s %s " % path
183
184         (out, err), proc = self.node.execute(cmd)
185
186         if err and proc.poll():
187             msg = " Couldn't find trace %s " % name
188             self.error(msg, out, err)
189             return None
190         
191         if attr == TraceAttr.SIZE:
192             out = int(out.strip())
193
194         return out
195             
196     def provision(self):
197         # create home dir for application
198         self.node.mkdir(self.app_home)
199
200         # upload sources
201         self.upload_sources()
202
203         # upload code
204         self.upload_code()
205
206         # upload stdin
207         self.upload_stdin()
208
209         # install dependencies
210         self.install_dependencies()
211
212         # build
213         self.build()
214
215         # Install
216         self.install()
217
218         command = self.get("command")
219         x11 = self.get("forwardX11")
220         if not x11 and command:
221             self.info("Uploading command '%s'" % command)
222
223             # Export environment
224             environ = ""
225             if self.get("env"):
226                 for var in self.get("env").split(" "):
227                     environ += 'export %s\n' % var
228
229             command = environ + command
230
231             # If the command runs asynchronous, pre upload the command 
232             # to the app.sh file in the remote host
233             dst = os.path.join(self.app_home, "app.sh")
234             command = self.replace_paths(command)
235             self.node.upload(command, dst, text = True)
236
237         super(LinuxApplication, self).provision()
238
239     def upload_sources(self):
240         # TODO: check if sources need to be uploaded and upload them
241         sources = self.get("sources")
242         if sources:
243             self.info(" Uploading sources ")
244
245             # create dir for sources
246             self.node.mkdir(self.src_dir)
247
248             sources = sources.split(' ')
249
250             http_sources = list()
251             for source in list(sources):
252                 if source.startswith("http") or source.startswith("https"):
253                     http_sources.append(source)
254                     sources.remove(source)
255
256             # Download http sources
257             if http_sources:
258                 cmd = " wget -c --directory-prefix=${SOURCES} "
259                 verif = ""
260
261                 for source in http_sources:
262                     cmd += " %s " % (source)
263                     verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
264                 
265                 # Wget output goes to stderr :S
266                 cmd += " 2> /dev/null ; "
267
268                 # Add verification
269                 cmd += " %s " % verif
270
271                 # Upload the command to a file, and execute asynchronously
272                 self.upload_and_run(cmd, 
273                         "http_sources.sh", "http_sources_pid", 
274                         "http_sources_out", "http_sources_err")
275             if sources:
276                 self.node.upload(sources, self.src_dir)
277
278     def upload_code(self):
279         code = self.get("code")
280         if code:
281             # create dir for sources
282             self.node.mkdir(self.src_dir)
283
284             self.info(" Uploading code ")
285
286             dst = os.path.join(self.src_dir, "code")
287             self.node.upload(sources, dst, text = True)
288
289     def upload_stdin(self):
290         stdin = self.get("stdin")
291         if stdin:
292             # create dir for sources
293             self.info(" Uploading stdin ")
294
295             dst = os.path.join(self.app_home, "stdin")
296             self.node.upload(stdin, dst, text = True)
297
298     def install_dependencies(self):
299         depends = self.get("depends")
300         if depends:
301             self.info(" Installing dependencies %s" % depends)
302             self.node.install_packages(depends, home = self.app_home)
303
304     def build(self):
305         build = self.get("build")
306         if build:
307             self.info(" Building sources ")
308             
309             # create dir for build
310             self.node.mkdir(self.build_dir)
311
312             # Upload the command to a file, and execute asynchronously
313             self.upload_and_run(build, 
314                     "build.sh", "build_pid", 
315                     "build_out", "build_err")
316  
317     def install(self):
318         install = self.get("install")
319         if install:
320             self.info(" Installing sources ")
321
322             # Upload the command to a file, and execute asynchronously
323             self.upload_and_run(install, 
324                     "install.sh", "install_pid", 
325                     "install_out", "install_err")
326
327     def deploy(self):
328         # Wait until node is associated and deployed
329         node = self.node
330         if not node or node.state < ResourceState.READY:
331             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
332             self.ec.schedule(reschedule_delay, self.deploy)
333         else:
334             try:
335                 command = self.get("command") or ""
336                 self.info("Deploying command '%s' " % command)
337                 self.discover()
338                 self.provision()
339             except:
340                 self._state = ResourceState.FAILED
341                 raise
342
343             super(LinuxApplication, self).deploy()
344
345     def start(self):
346         command = self.get('command')
347         env = self.get('env')
348         stdin = 'stdin' if self.get('stdin') else None
349         stdout = 'stdout' if self.get('stdout') else 'stdout'
350         stderr = 'stderr' if self.get('stderr') else 'stderr'
351         sudo = self.get('sudo') or False
352         x11 = self.get('forwardX11') or False
353         failed = False
354
355         super(LinuxApplication, self).start()
356
357         if not command:
358             self.info("No command to start ")
359             self._state = ResourceState.FINISHED
360             return 
361     
362         self.info("Starting command '%s'" % command)
363
364         if x11:
365             if env:
366                 # Export environment
367                 environ = ""
368                 for var in env.split(" "):
369                     environ += ' %s ' % var
370
371                 command = "(" + environ + " ; " + command + ")"
372                 command = self.replace_paths(command)
373
374             # If the command requires X11 forwarding, we
375             # can't run it asynchronously
376             (out, err), proc = self.node.execute(command,
377                     sudo = sudo,
378                     stdin = stdin,
379                     forward_x11 = x11)
380
381             self._state = ResourceState.FINISHED
382
383             if proc.poll() and err:
384                 failed = True
385         else:
386             # Command was  previously uploaded, now run the remote
387             # bash file asynchronously
388             cmd = "bash ./app.sh"
389             (out, err), proc = self.node.run(cmd, self.app_home, 
390                 stdin = stdin, 
391                 stdout = stdout,
392                 stderr = stderr,
393                 sudo = sudo)
394
395             if proc.poll() and err:
396                 failed = True
397         
398             if not failed:
399                 pid, ppid = self.node.wait_pid(home = self.app_home)
400                 if pid: self._pid = int(pid)
401                 if ppid: self._ppid = int(ppid)
402
403             if not self.pid or not self.ppid:
404                 failed = True
405  
406             (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
407
408             if failed or out or chkerr:
409                 # check if execution errors occurred
410                 msg = " Failed to start command '%s' " % command
411                 out = out
412                 if err:
413                     err = err
414                 elif chkerr:
415                     err = chkerr
416
417                 self.error(msg, out, err)
418
419                 msg2 = " Setting state to Failed"
420                 self.debug(msg2)
421                 self._state = ResourceState.FAILED
422
423                 raise RuntimeError, msg
424
425     def stop(self):
426         command = self.get('command') or ''
427         state = self.state
428         
429         if state == ResourceState.STARTED:
430             self.info("Stopping command '%s'" % command)
431
432             (out, err), proc = self.node.kill(self.pid, self.ppid)
433
434             if out or err:
435                 # check if execution errors occurred
436                 msg = " Failed to STOP command '%s' " % self.get("command")
437                 self.error(msg, out, err)
438                 self._state = ResourceState.FAILED
439                 stopped = False
440             else:
441                 super(LinuxApplication, self).stop()
442
443     def release(self):
444         self.info("Releasing resource")
445
446         tear_down = self.get("tearDown")
447         if tear_down:
448             self.node.execute(tear_down)
449
450         self.stop()
451         if self.state == ResourceState.STOPPED:
452             super(LinuxApplication, self).release()
453     
454     @property
455     def state(self):
456         if self._state == ResourceState.STARTED:
457             # To avoid overwhelming the remote hosts and the local processor
458             # with too many ssh queries, the state is only requested
459             # every 'state_check_delay' .
460             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
461                 # check if execution errors occurred
462                 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
463
464                 if out or err:
465                     if err.find("No such file or directory") >= 0 :
466                         # The resource is marked as started, but the
467                         # command was not yet executed
468                         return ResourceState.READY
469
470                     msg = " Failed to execute command '%s'" % self.get("command")
471                     self.error(msg, out, err)
472                     self._state = ResourceState.FAILED
473
474                 elif self.pid and self.ppid:
475                     status = self.node.status(self.pid, self.ppid)
476
477                     if status == sshfuncs.FINISHED:
478                         self._state = ResourceState.FINISHED
479
480
481                 self._last_state_check = strfnow()
482
483         return self._state
484
485     def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
486         dst = os.path.join(self.app_home, fname)
487         cmd = self.replace_paths(cmd)
488         self.node.upload(cmd, dst, text = True)
489
490         cmd = "bash ./%s" % fname
491         (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
492             pidfile = pidfile,
493             stdout = outfile, 
494             stderr = errfile, 
495             raise_on_error = True)
496
497     def replace_paths(self, command):
498         """
499         Replace all special path tags with shell-escaped actual paths.
500         """
501         def absolute_dir(d):
502             return d if d.startswith("/") else os.path.join("${HOME}", d)
503
504         return ( command
505             .replace("${SOURCES}", absolute_dir(self.src_dir))
506             .replace("${BUILD}", absolute_dir(self.build_dir))
507             .replace("${APP_HOME}", absolute_dir(self.app_home))
508             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
509             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
510             )
511         
512     def valid_connection(self, guid):
513         # TODO: Validate!
514         return True
515         # XXX: What if it is connected to more than one node?
516         resources = self.find_resources(exact_tags = [tags.NODE])
517         self._node = resources[0] if len(resources) == 1 else None
518         return self._node
519