change bug of dict
[nepi.git] / src / nepi / resources / linux / application.py
1 """
2     NEPI, a framework to manage network experiments
3     Copyright (C) 2013 INRIA
4
5     This program is free software: you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation, either version 3 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 """
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util import sshfuncs 
25 from nepi.util.timefuncs import strfnow, strfdiff
26
27 import os
28
29 reschedule_delay = "0.5s"
30 state_check_delay = 1
31
32 # TODO: Resolve wildcards in commands!! 
33
34 @clsinit
35 class LinuxApplication(ResourceManager):
36     _rtype = "LinuxApplication"
37
38     @classmethod
39     def _register_attributes(cls):
40         command = Attribute("command", "Command to execute", 
41                 flags = Flags.ExecReadOnly)
42         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
43                 flags = Flags.ExecReadOnly)
44         env = Attribute("env", "Environment variables string for command execution",
45                 flags = Flags.ExecReadOnly)
46         sudo = Attribute("sudo", "Run with root privileges", 
47                 flags = Flags.ExecReadOnly)
48         depends = Attribute("depends", 
49                 "Space-separated list of packages required to run the application",
50                 flags = Flags.ExecReadOnly)
51         sources = Attribute("sources", 
52                 "Space-separated list of regular files to be deployed in the working "
53                 "path prior to building. Archives won't be expanded automatically.",
54                 flags = Flags.ExecReadOnly)
55         code = Attribute("code", 
56                 "Plain text source code to be uploaded to the server. It will be stored "
57                 "under ${SOURCES}/code",
58                 flags = Flags.ExecReadOnly)
59         build = Attribute("build", 
60                 "Build commands to execute after deploying the sources. "
61                 "Sources will be in the ${SOURCES} folder. "
62                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
63                 "Try to make the commands return with a nonzero exit code on error.\n"
64                 "Also, do not install any programs here, use the 'install' attribute. This will "
65                 "help keep the built files constrained to the build folder (which may "
66                 "not be the home folder), and will result in faster deployment. Also, "
67                 "make sure to clean up temporary files, to reduce bandwidth usage between "
68                 "nodes when transferring built packages.",
69                 flags = Flags.ReadOnly)
70         install = Attribute("install", 
71                 "Commands to transfer built files to their final destinations. "
72                 "Sources will be in the initial working folder, and a special "
73                 "tag ${SOURCES} can be used to reference the experiment's "
74                 "home folder (where the application commands will run).\n"
75                 "ALL sources and targets needed for execution must be copied there, "
76                 "if building has been enabled.\n"
77                 "That is, 'slave' nodes will not automatically get any source files. "
78                 "'slave' nodes don't get build dependencies either, so if you need "
79                 "make and other tools to install, be sure to provide them as "
80                 "actual dependencies instead.",
81                 flags = Flags.ReadOnly)
82         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
83         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
84         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
85         tear_down = Attribute("tearDown", "Bash script to be executed before "
86                 "releasing the resource", 
87                 flags = Flags.ReadOnly)
88
89         cls._register_attribute(command)
90         cls._register_attribute(forward_x11)
91         cls._register_attribute(env)
92         cls._register_attribute(sudo)
93         cls._register_attribute(depends)
94         cls._register_attribute(sources)
95         cls._register_attribute(code)
96         cls._register_attribute(build)
97         cls._register_attribute(install)
98         cls._register_attribute(stdin)
99         cls._register_attribute(stdout)
100         cls._register_attribute(stderr)
101         cls._register_attribute(tear_down)
102
103     @classmethod
104     def _register_traces(cls):
105         stdout = Trace("stdout", "Standard output stream")
106         stderr = Trace("stderr", "Standard error stream")
107         buildlog = Trace("buildlog", "Output of the build process")
108
109         cls._register_trace(stdout)
110         cls._register_trace(stderr)
111         cls._register_trace(buildlog)
112
113     def __init__(self, ec, guid):
114         super(LinuxApplication, self).__init__(ec, guid)
115         self._pid = None
116         self._ppid = None
117         self._home = "app-%s" % self.guid
118
119         # timestamp of last state check of the application
120         self._last_state_check = strfnow()
121     
122     def log_message(self, msg):
123         return " guid %d - host %s - %s " % (self.guid, 
124                 self.node.get("hostname"), msg)
125
126     @property
127     def node(self):
128         node = self.get_connected(LinuxNode.rtype())
129         if node: return node[0]
130         return None
131
132     @property
133     def app_home(self):
134         return os.path.join(self.node.exp_home, self._home)
135
136     @property
137     def src_dir(self):
138         return os.path.join(self.app_home, 'src')
139
140     @property
141     def build_dir(self):
142         return os.path.join(self.app_home, 'build')
143
144     @property
145     def pid(self):
146         return self._pid
147
148     @property
149     def ppid(self):
150         return self._ppid
151
152     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
153         self.info("Retrieving '%s' trace %s " % (name, attr))
154
155         path = os.path.join(self.app_home, name)
156         
157         command = "(test -f %s && echo 'success') || echo 'error'" % path
158         (out, err), proc = self.node.execute(command)
159
160         if (err and proc.poll()) or out.find("error") != -1:
161             msg = " Couldn't find trace %s " % name
162             self.error(msg, out, err)
163             return None
164     
165         if attr == TraceAttr.PATH:
166             return path
167
168         if attr == TraceAttr.ALL:
169             (out, err), proc = self.node.check_output(self.app_home, name)
170             
171             if err and proc.poll():
172                 msg = " Couldn't read trace %s " % name
173                 self.error(msg, out, err)
174                 return None
175
176             return out
177
178         if attr == TraceAttr.STREAM:
179             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
180         elif attr == TraceAttr.SIZE:
181             cmd = "stat -c%%s %s " % path
182
183         (out, err), proc = self.node.execute(cmd)
184
185         if err and proc.poll():
186             msg = " Couldn't find trace %s " % name
187             self.error(msg, out, err)
188             return None
189         
190         if attr == TraceAttr.SIZE:
191             out = int(out.strip())
192
193         return out
194             
195     def provision(self):
196         # create home dir for application
197         self.node.mkdir(self.app_home)
198
199         # upload sources
200         self.upload_sources()
201
202         # upload code
203         self.upload_code()
204
205         # upload stdin
206         self.upload_stdin()
207
208         # install dependencies
209         self.install_dependencies()
210
211         # build
212         self.build()
213
214         # Install
215         self.install()
216
217         command = self.get("command")
218         x11 = self.get("forwardX11")
219         if not x11 and command:
220             self.info("Uploading command '%s'" % command)
221
222             # Export environment
223             environ = ""
224             env = self.get("env") or ""
225             for var in env.split(" "):
226                 environ += 'export %s\n' % var
227
228             command = environ + command
229
230             # If the command runs asynchronous, pre upload the command 
231             # to the app.sh file in the remote host
232             dst = os.path.join(self.app_home, "app.sh")
233             command = self.replace_paths(command)
234             self.node.upload(command, dst, text = True)
235
236         super(LinuxApplication, self).provision()
237
238     def upload_sources(self):
239         # TODO: check if sources need to be uploaded and upload them
240         sources = self.get("sources")
241         if sources:
242             self.info(" Uploading sources ")
243
244             # create dir for sources
245             self.node.mkdir(self.src_dir)
246
247             sources = sources.split(' ')
248
249             http_sources = list()
250             for source in list(sources):
251                 if source.startswith("http") or source.startswith("https"):
252                     http_sources.append(source)
253                     sources.remove(source)
254
255             # Download http sources
256             if http_sources:
257                 cmd = " wget -c --directory-prefix=${SOURCES} "
258                 verif = ""
259
260                 for source in http_sources:
261                     cmd += " %s " % (source)
262                     verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
263                 
264                 # Wget output goes to stderr :S
265                 cmd += " 2> /dev/null ; "
266
267                 # Add verification
268                 cmd += " %s " % verif
269
270                 # Upload the command to a file, and execute asynchronously
271                 self.upload_and_run(cmd, 
272                         "http_sources.sh", "http_sources_pid", 
273                         "http_sources_out", "http_sources_err")
274             if sources:
275                 self.node.upload(sources, self.src_dir)
276
277     def upload_code(self):
278         code = self.get("code")
279         if code:
280             # create dir for sources
281             self.node.mkdir(self.src_dir)
282
283             self.info(" Uploading code ")
284
285             dst = os.path.join(self.src_dir, "code")
286             self.node.upload(sources, dst, text = True)
287
288     def upload_stdin(self):
289         stdin = self.get("stdin")
290         if stdin:
291             # create dir for sources
292             self.info(" Uploading stdin ")
293
294             dst = os.path.join(self.app_home, "stdin")
295             self.node.upload(stdin, dst, text = True)
296
297     def install_dependencies(self):
298         depends = self.get("depends")
299         if depends:
300             self.info(" Installing dependencies %s" % depends)
301             self.node.install_packages(depends, home = self.app_home)
302
303     def build(self):
304         build = self.get("build")
305         if build:
306             self.info(" Building sources ")
307             
308             # create dir for build
309             self.node.mkdir(self.build_dir)
310
311             # Upload the command to a file, and execute asynchronously
312             self.upload_and_run(build, 
313                     "build.sh", "build_pid", 
314                     "build_out", "build_err")
315  
316     def install(self):
317         install = self.get("install")
318         if install:
319             self.info(" Installing sources ")
320
321             # Upload the command to a file, and execute asynchronously
322             self.upload_and_run(install, 
323                     "install.sh", "install_pid", 
324                     "install_out", "install_err")
325
326     def deploy(self):
327         # Wait until node is associated and deployed
328         node = self.node
329         if not node or node.state < ResourceState.READY:
330             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
331             self.ec.schedule(reschedule_delay, self.deploy)
332         else:
333             try:
334                 command = self.get("command") or ""
335                 self.info("Deploying command '%s' " % command)
336                 self.discover()
337                 self.provision()
338             except:
339                 self._state = ResourceState.FAILED
340                 raise
341
342             super(LinuxApplication, self).deploy()
343
344     def start(self):
345         command = self.get('command')
346         env = self.get('env')
347         stdin = 'stdin' if self.get('stdin') else None
348         stdout = 'stdout' if self.get('stdout') else 'stdout'
349         stderr = 'stderr' if self.get('stderr') else 'stderr'
350         sudo = self.get('sudo') or False
351         x11 = self.get('forwardX11') or False
352         failed = False
353
354         super(LinuxApplication, self).start()
355
356         if not command:
357             self.info("No command to start ")
358             self._state = ResourceState.FINISHED
359             return 
360     
361         self.info("Starting command '%s'" % command)
362
363         if x11:
364             if env:
365                 # Export environment
366                 environ = ""
367                 for var in env.split(" "):
368                     environ += ' %s ' % var
369
370                 command = "(" + environ + " ; " + command + ")"
371                 command = self.replace_paths(command)
372
373             # If the command requires X11 forwarding, we
374             # can't run it asynchronously
375             (out, err), proc = self.node.execute(command,
376                     sudo = sudo,
377                     stdin = stdin,
378                     forward_x11 = x11)
379
380             self._state = ResourceState.FINISHED
381
382             if proc.poll() and err:
383                 failed = True
384         else:
385             # Command was  previously uploaded, now run the remote
386             # bash file asynchronously
387             cmd = "bash ./app.sh"
388             (out, err), proc = self.node.run(cmd, self.app_home, 
389                 stdin = stdin, 
390                 stdout = stdout,
391                 stderr = stderr,
392                 sudo = sudo)
393
394             if proc.poll() and err:
395                 failed = True
396         
397             if not failed:
398                 pid, ppid = self.node.wait_pid(home = self.app_home)
399                 if pid: self._pid = int(pid)
400                 if ppid: self._ppid = int(ppid)
401
402             if not self.pid or not self.ppid:
403                 failed = True
404  
405             (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
406
407             if failed or out or chkerr:
408                 # check if execution errors occurred
409                 msg = " Failed to start command '%s' " % command
410                 out = out
411                 if err:
412                     err = err
413                 elif chkerr:
414                     err = chkerr
415
416                 self.error(msg, out, err)
417
418                 msg2 = " Setting state to Failed"
419                 self.debug(msg2)
420                 self._state = ResourceState.FAILED
421
422                 raise RuntimeError, msg
423
424     def stop(self):
425         command = self.get('command') or ''
426         state = self.state
427         
428         if state == ResourceState.STARTED:
429             self.info("Stopping command '%s'" % command)
430
431             (out, err), proc = self.node.kill(self.pid, self.ppid)
432
433             if out or err:
434                 # check if execution errors occurred
435                 msg = " Failed to STOP command '%s' " % self.get("command")
436                 self.error(msg, out, err)
437                 self._state = ResourceState.FAILED
438                 stopped = False
439             else:
440                 super(LinuxApplication, self).stop()
441
442     def release(self):
443         self.info("Releasing resource")
444
445         tear_down = self.get("tearDown")
446         if tear_down:
447             self.node.execute(tear_down)
448
449         self.stop()
450         if self.state == ResourceState.STOPPED:
451             super(LinuxApplication, self).release()
452     
453     @property
454     def state(self):
455         if self._state == ResourceState.STARTED:
456             # To avoid overwhelming the remote hosts and the local processor
457             # with too many ssh queries, the state is only requested
458             # every 'state_check_delay' .
459             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
460                 # check if execution errors occurred
461                 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
462
463                 if out or err:
464                     if err.find("No such file or directory") >= 0 :
465                         # The resource is marked as started, but the
466                         # command was not yet executed
467                         return ResourceState.READY
468
469                     msg = " Failed to execute command '%s'" % self.get("command")
470                     self.error(msg, out, err)
471                     self._state = ResourceState.FAILED
472
473                 elif self.pid and self.ppid:
474                     status = self.node.status(self.pid, self.ppid)
475
476                     if status == sshfuncs.FINISHED:
477                         self._state = ResourceState.FINISHED
478
479
480                 self._last_state_check = strfnow()
481
482         return self._state
483
484     def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
485         dst = os.path.join(self.app_home, fname)
486         cmd = self.replace_paths(cmd)
487         self.node.upload(cmd, dst, text = True)
488
489         cmd = "bash ./%s" % fname
490         (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
491             pidfile = pidfile,
492             stdout = outfile, 
493             stderr = errfile, 
494             raise_on_error = True)
495
496     def replace_paths(self, command):
497         """
498         Replace all special path tags with shell-escaped actual paths.
499         """
500         def absolute_dir(d):
501             return d if d.startswith("/") else os.path.join("${HOME}", d)
502
503         return ( command
504             .replace("${SOURCES}", absolute_dir(self.src_dir))
505             .replace("${BUILD}", absolute_dir(self.build_dir))
506             .replace("${APP_HOME}", absolute_dir(self.app_home))
507             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
508             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
509             )
510         
511     def valid_connection(self, guid):
512         # TODO: Validate!
513         return True
514         # XXX: What if it is connected to more than one node?
515         resources = self.find_resources(exact_tags = [tags.NODE])
516         self._node = resources[0] if len(resources) == 1 else None
517         return self._node
518