Added LICENSE
[nepi.git] / src / nepi / resources / linux / application.py
1 """
2     NEPI, a framework to manage network experiments
3     Copyright (C) 2013 INRIA
4
5     This program is free software: you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation, either version 3 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 """
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util import sshfuncs 
25 from nepi.util.timefuncs import strfnow, strfdiff
26
27 import logging
28 import os
29
30 reschedule_delay = "0.5s"
31 state_check_delay = 1
32
33 # TODO: Resolve wildcards in commands!! 
34
35 @clsinit
36 class LinuxApplication(ResourceManager):
37     _rtype = "LinuxApplication"
38
39     @classmethod
40     def _register_attributes(cls):
41         command = Attribute("command", "Command to execute", 
42                 flags = Flags.ExecReadOnly)
43         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
44                 flags = Flags.ExecReadOnly)
45         env = Attribute("env", "Environment variables string for command execution",
46                 flags = Flags.ExecReadOnly)
47         sudo = Attribute("sudo", "Run with root privileges", 
48                 flags = Flags.ExecReadOnly)
49         depends = Attribute("depends", 
50                 "Space-separated list of packages required to run the application",
51                 flags = Flags.ExecReadOnly)
52         sources = Attribute("sources", 
53                 "Space-separated list of regular files to be deployed in the working "
54                 "path prior to building. Archives won't be expanded automatically.",
55                 flags = Flags.ExecReadOnly)
56         code = Attribute("code", 
57                 "Plain text source code to be uploaded to the server. It will be stored "
58                 "under ${SOURCES}/code",
59                 flags = Flags.ExecReadOnly)
60         build = Attribute("build", 
61                 "Build commands to execute after deploying the sources. "
62                 "Sources will be in the ${SOURCES} folder. "
63                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
64                 "Try to make the commands return with a nonzero exit code on error.\n"
65                 "Also, do not install any programs here, use the 'install' attribute. This will "
66                 "help keep the built files constrained to the build folder (which may "
67                 "not be the home folder), and will result in faster deployment. Also, "
68                 "make sure to clean up temporary files, to reduce bandwidth usage between "
69                 "nodes when transferring built packages.",
70                 flags = Flags.ReadOnly)
71         install = Attribute("install", 
72                 "Commands to transfer built files to their final destinations. "
73                 "Sources will be in the initial working folder, and a special "
74                 "tag ${SOURCES} can be used to reference the experiment's "
75                 "home folder (where the application commands will run).\n"
76                 "ALL sources and targets needed for execution must be copied there, "
77                 "if building has been enabled.\n"
78                 "That is, 'slave' nodes will not automatically get any source files. "
79                 "'slave' nodes don't get build dependencies either, so if you need "
80                 "make and other tools to install, be sure to provide them as "
81                 "actual dependencies instead.",
82                 flags = Flags.ReadOnly)
83         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
84         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
85         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
86         tear_down = Attribute("tearDown", "Bash script to be executed before "
87                 "releasing the resource", 
88                 flags = Flags.ReadOnly)
89
90         cls._register_attribute(command)
91         cls._register_attribute(forward_x11)
92         cls._register_attribute(env)
93         cls._register_attribute(sudo)
94         cls._register_attribute(depends)
95         cls._register_attribute(sources)
96         cls._register_attribute(code)
97         cls._register_attribute(build)
98         cls._register_attribute(install)
99         cls._register_attribute(stdin)
100         cls._register_attribute(stdout)
101         cls._register_attribute(stderr)
102         cls._register_attribute(tear_down)
103
104     @classmethod
105     def _register_traces(cls):
106         stdout = Trace("stdout", "Standard output stream")
107         stderr = Trace("stderr", "Standard error stream")
108         buildlog = Trace("buildlog", "Output of the build process")
109
110         cls._register_trace(stdout)
111         cls._register_trace(stderr)
112         cls._register_trace(buildlog)
113
114     def __init__(self, ec, guid):
115         super(LinuxApplication, self).__init__(ec, guid)
116         self._pid = None
117         self._ppid = None
118         self._home = "app-%s" % self.guid
119
120         # timestamp of last state check of the application
121         self._last_state_check = strfnow()
122
123         self._logger = logging.getLogger("LinuxApplication")
124     
125     def log_message(self, msg):
126         return " guid %d - host %s - %s " % (self.guid, 
127                 self.node.get("hostname"), msg)
128
129     @property
130     def node(self):
131         node = self.get_connected(LinuxNode.rtype())
132         if node: return node[0]
133         return None
134
135     @property
136     def app_home(self):
137         return os.path.join(self.node.exp_home, self._home)
138
139     @property
140     def src_dir(self):
141         return os.path.join(self.app_home, 'src')
142
143     @property
144     def build_dir(self):
145         return os.path.join(self.app_home, 'build')
146
147     @property
148     def pid(self):
149         return self._pid
150
151     @property
152     def ppid(self):
153         return self._ppid
154
155     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
156         self.info("Retrieving '%s' trace %s " % (name, attr))
157
158         path = os.path.join(self.app_home, name)
159         
160         command = "(test -f %s && echo 'success') || echo 'error'" % path
161         (out, err), proc = self.node.execute(command)
162
163         if (err and proc.poll()) or out.find("error") != -1:
164             msg = " Couldn't find trace %s " % name
165             self.error(msg, out, err)
166             return None
167     
168         if attr == TraceAttr.PATH:
169             return path
170
171         if attr == TraceAttr.ALL:
172             (out, err), proc = self.node.check_output(self.app_home, name)
173             
174             if err and proc.poll():
175                 msg = " Couldn't read trace %s " % name
176                 self.error(msg, out, err)
177                 return None
178
179             return out
180
181         if attr == TraceAttr.STREAM:
182             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
183         elif attr == TraceAttr.SIZE:
184             cmd = "stat -c%%s %s " % path
185
186         (out, err), proc = self.node.execute(cmd)
187
188         if err and proc.poll():
189             msg = " Couldn't find trace %s " % name
190             self.error(msg, out, err)
191             return None
192         
193         if attr == TraceAttr.SIZE:
194             out = int(out.strip())
195
196         return out
197             
198     def provision(self, filters = None):
199         # create home dir for application
200         self.node.mkdir(self.app_home)
201
202         # upload sources
203         self.upload_sources()
204
205         # upload code
206         self.upload_code()
207
208         # upload stdin
209         self.upload_stdin()
210
211         # install dependencies
212         self.install_dependencies()
213
214         # build
215         self.build()
216
217         # Install
218         self.install()
219
220         command = self.get("command")
221         x11 = self.get("forwardX11")
222         if not x11 and command:
223             self.info("Uploading command '%s'" % command)
224
225             # Export environment
226             environ = ""
227             env = self.get("env") or ""
228             for var in env.split(" "):
229                 environ += 'export %s\n' % var
230
231             command = environ + command
232
233             # If the command runs asynchronous, pre upload the command 
234             # to the app.sh file in the remote host
235             dst = os.path.join(self.app_home, "app.sh")
236             command = self.replace_paths(command)
237             self.node.upload(command, dst, text = True)
238
239         super(LinuxApplication, self).provision()
240
241     def upload_sources(self):
242         # TODO: check if sources need to be uploaded and upload them
243         sources = self.get("sources")
244         if sources:
245             self.info(" Uploading sources ")
246
247             # create dir for sources
248             self.node.mkdir(self.src_dir)
249
250             sources = sources.split(' ')
251
252             http_sources = list()
253             for source in list(sources):
254                 if source.startswith("http") or source.startswith("https"):
255                     http_sources.append(source)
256                     sources.remove(source)
257
258             # Download http sources
259             if http_sources:
260                 cmd = " wget -c --directory-prefix=${SOURCES} "
261                 verif = ""
262
263                 for source in http_sources:
264                     cmd += " %s " % (source)
265                     verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
266                 
267                 # Wget output goes to stderr :S
268                 cmd += " 2> /dev/null ; "
269
270                 # Add verification
271                 cmd += " %s " % verif
272
273                 # Upload the command to a file, and execute asynchronously
274                 self.upload_and_run(cmd, 
275                         "http_sources.sh", "http_sources_pid", 
276                         "http_sources_out", "http_sources_err")
277             if sources:
278                 self.node.upload(sources, self.src_dir)
279
280     def upload_code(self):
281         code = self.get("code")
282         if code:
283             # create dir for sources
284             self.node.mkdir(self.src_dir)
285
286             self.info(" Uploading code ")
287
288             dst = os.path.join(self.src_dir, "code")
289             self.node.upload(sources, dst, text = True)
290
291     def upload_stdin(self):
292         stdin = self.get("stdin")
293         if stdin:
294             # create dir for sources
295             self.info(" Uploading stdin ")
296
297             dst = os.path.join(self.app_home, "stdin")
298             self.node.upload(stdin, dst, text = True)
299
300     def install_dependencies(self):
301         depends = self.get("depends")
302         if depends:
303             self.info(" Installing dependencies %s" % depends)
304             self.node.install_packages(depends, home = self.app_home)
305
306     def build(self):
307         build = self.get("build")
308         if build:
309             self.info(" Building sources ")
310             
311             # create dir for build
312             self.node.mkdir(self.build_dir)
313
314             # Upload the command to a file, and execute asynchronously
315             self.upload_and_run(build, 
316                     "build.sh", "build_pid", 
317                     "build_out", "build_err")
318  
319     def install(self):
320         install = self.get("install")
321         if install:
322             self.info(" Installing sources ")
323
324             # Upload the command to a file, and execute asynchronously
325             self.upload_and_run(install, 
326                     "install.sh", "install_pid", 
327                     "install_out", "install_err")
328
329     def deploy(self):
330         # Wait until node is associated and deployed
331         node = self.node
332         if not node or node.state < ResourceState.READY:
333             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
334             self.ec.schedule(reschedule_delay, self.deploy)
335         else:
336             try:
337                 command = self.get("command") or ""
338                 self.info(" Deploying command '%s' " % command)
339                 self.discover()
340                 self.provision()
341             except:
342                 self._state = ResourceState.FAILED
343                 raise
344
345             super(LinuxApplication, self).deploy()
346
347     def start(self):
348         command = self.get('command')
349         env = self.get('env')
350         stdin = 'stdin' if self.get('stdin') else None
351         stdout = 'stdout' if self.get('stdout') else 'stdout'
352         stderr = 'stderr' if self.get('stderr') else 'stderr'
353         sudo = self.get('sudo') or False
354         x11 = self.get('forwardX11') or False
355         failed = False
356
357         super(LinuxApplication, self).start()
358
359         if not command:
360             self.info("No command to start ")
361             self._state = ResourceState.FINISHED
362             return 
363     
364         self.info("Starting command '%s'" % command)
365
366         if x11:
367             if env:
368                 # Export environment
369                 environ = ""
370                 for var in env.split(" "):
371                     environ += ' %s ' % var
372
373                 command = "(" + environ + " ; " + command + ")"
374                 command = self.replace_paths(command)
375
376             # If the command requires X11 forwarding, we
377             # can't run it asynchronously
378             (out, err), proc = self.node.execute(command,
379                     sudo = sudo,
380                     stdin = stdin,
381                     forward_x11 = x11)
382
383             self._state = ResourceState.FINISHED
384
385             if proc.poll() and err:
386                 failed = True
387         else:
388             # Command was  previously uploaded, now run the remote
389             # bash file asynchronously
390             cmd = "bash ./app.sh"
391             (out, err), proc = self.node.run(cmd, self.app_home, 
392                 stdin = stdin, 
393                 stdout = stdout,
394                 stderr = stderr,
395                 sudo = sudo)
396
397             if proc.poll() and err:
398                 failed = True
399         
400             if not failed:
401                 pid, ppid = self.node.wait_pid(home = self.app_home)
402                 if pid: self._pid = int(pid)
403                 if ppid: self._ppid = int(ppid)
404
405             if not self.pid or not self.ppid:
406                 failed = True
407  
408             (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
409
410             if failed or out or chkerr:
411                 # check if execution errors occurred
412                 msg = " Failed to start command '%s' " % command
413                 out = out
414                 if err:
415                     err = err
416                 elif chkerr:
417                     err = chkerr
418
419                 self.error(msg, out, err)
420
421                 msg2 = " Setting state to Failed"
422                 self.debug(msg2)
423                 self._state = ResourceState.FAILED
424
425                 raise RuntimeError, msg
426
427     def stop(self):
428         command = self.get('command') or ''
429         state = self.state
430         
431         if state == ResourceState.STARTED:
432             self.info("Stopping command '%s'" % command)
433
434             (out, err), proc = self.node.kill(self.pid, self.ppid)
435
436             if out or err:
437                 # check if execution errors occurred
438                 msg = " Failed to STOP command '%s' " % self.get("command")
439                 self.error(msg, out, err)
440                 self._state = ResourceState.FAILED
441                 stopped = False
442             else:
443                 super(LinuxApplication, self).stop()
444
445     def release(self):
446         self.info("Releasing resource")
447
448         tear_down = self.get("tearDown")
449         if tear_down:
450             self.node.execute(tear_down)
451
452         self.stop()
453         if self.state == ResourceState.STOPPED:
454             super(LinuxApplication, self).release()
455     
456     @property
457     def state(self):
458         if self._state == ResourceState.STARTED:
459             # To avoid overwhelming the remote hosts and the local processor
460             # with too many ssh queries, the state is only requested
461             # every 'state_check_delay' .
462             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
463                 # check if execution errors occurred
464                 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
465
466                 if out or err:
467                     if err.find("No such file or directory") >= 0 :
468                         # The resource is marked as started, but the
469                         # command was not yet executed
470                         return ResourceState.READY
471
472                     msg = " Failed to execute command '%s'" % self.get("command")
473                     self.error(msg, out, err)
474                     self._state = ResourceState.FAILED
475
476                 elif self.pid and self.ppid:
477                     status = self.node.status(self.pid, self.ppid)
478
479                     if status == sshfuncs.FINISHED:
480                         self._state = ResourceState.FINISHED
481
482
483                 self._last_state_check = strfnow()
484
485         return self._state
486
487     def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
488         dst = os.path.join(self.app_home, fname)
489         cmd = self.replace_paths(cmd)
490         self.node.upload(cmd, dst, text = True)
491
492         cmd = "bash ./%s" % fname
493         (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
494             pidfile = pidfile,
495             stdout = outfile, 
496             stderr = errfile, 
497             raise_on_error = True)
498
499     def replace_paths(self, command):
500         """
501         Replace all special path tags with shell-escaped actual paths.
502         """
503         def absolute_dir(d):
504             return d if d.startswith("/") else os.path.join("${HOME}", d)
505
506         return ( command
507             .replace("${SOURCES}", absolute_dir(self.src_dir))
508             .replace("${BUILD}", absolute_dir(self.build_dir))
509             .replace("${APP_HOME}", absolute_dir(self.app_home))
510             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
511             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
512             )
513         
514     def valid_connection(self, guid):
515         # TODO: Validate!
516         return True
517         # XXX: What if it is connected to more than one node?
518         resources = self.find_resources(exact_tags = [tags.NODE])
519         self._node = resources[0] if len(resources) == 1 else None
520         return self._node
521