Added example for Linux Application using CCNx
[nepi.git] / src / neco / resources / linux / application.py
1 from neco.execution.attribute import Attribute, Flags, Types
2 from neco.execution.trace import Trace, TraceAttr
3 from neco.execution.resource import ResourceManager, clsinit, ResourceState
4 from neco.resources.linux.node import LinuxNode
5 from neco.util import sshfuncs 
6
7 import logging
8 import os
9
10 reschedule_delay = "0.5s"
11
12 # TODO: Resolve wildcards in commands!! 
13
14 @clsinit
15 class LinuxApplication(ResourceManager):
16     _rtype = "LinuxApplication"
17
18     @classmethod
19     def _register_attributes(cls):
20         command = Attribute("command", "Command to execute", 
21                 flags = Flags.ExecReadOnly)
22         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
23                 flags = Flags.ExecReadOnly)
24         env = Attribute("env", "Environment variables string for command execution",
25                 flags = Flags.ExecReadOnly)
26         sudo = Attribute("sudo", "Run with root privileges", 
27                 flags = Flags.ExecReadOnly)
28         depends = Attribute("depends", 
29                 "Space-separated list of packages required to run the application",
30                 flags = Flags.ExecReadOnly)
31         sources = Attribute("sources", 
32                 "Space-separated list of regular files to be deployed in the working "
33                 "path prior to building. Archives won't be expanded automatically.",
34                 flags = Flags.ExecReadOnly)
35         code = Attribute("code", 
36                 "Plain text source code to be uploaded to the server. It will be stored "
37                 "under ${SOURCES}/code",
38                 flags = Flags.ExecReadOnly)
39         build = Attribute("build", 
40                 "Build commands to execute after deploying the sources. "
41                 "Sources will be in the ${SOURCES} folder. "
42                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
43                 "Try to make the commands return with a nonzero exit code on error.\n"
44                 "Also, do not install any programs here, use the 'install' attribute. This will "
45                 "help keep the built files constrained to the build folder (which may "
46                 "not be the home folder), and will result in faster deployment. Also, "
47                 "make sure to clean up temporary files, to reduce bandwidth usage between "
48                 "nodes when transferring built packages.",
49                 flags = Flags.ReadOnly)
50         install = Attribute("install", 
51                 "Commands to transfer built files to their final destinations. "
52                 "Sources will be in the initial working folder, and a special "
53                 "tag ${SOURCES} can be used to reference the experiment's "
54                 "home folder (where the application commands will run).\n"
55                 "ALL sources and targets needed for execution must be copied there, "
56                 "if building has been enabled.\n"
57                 "That is, 'slave' nodes will not automatically get any source files. "
58                 "'slave' nodes don't get build dependencies either, so if you need "
59                 "make and other tools to install, be sure to provide them as "
60                 "actual dependencies instead.",
61                 flags = Flags.ReadOnly)
62         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
63         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
64         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
65         tear_down = Attribute("tearDown", "Bash script to be executed before "
66                 "releasing the resource", 
67                 flags = Flags.ReadOnly)
68
69         cls._register_attribute(command)
70         cls._register_attribute(forward_x11)
71         cls._register_attribute(env)
72         cls._register_attribute(sudo)
73         cls._register_attribute(depends)
74         cls._register_attribute(sources)
75         cls._register_attribute(code)
76         cls._register_attribute(build)
77         cls._register_attribute(install)
78         cls._register_attribute(stdin)
79         cls._register_attribute(stdout)
80         cls._register_attribute(stderr)
81         cls._register_attribute(tear_down)
82
83     @classmethod
84     def _register_traces(cls):
85         stdout = Trace("stdout", "Standard output stream")
86         stderr = Trace("stderr", "Standard error stream")
87         buildlog = Trace("buildlog", "Output of the build process")
88
89         cls._register_trace(stdout)
90         cls._register_trace(stderr)
91         cls._register_trace(buildlog)
92
93     def __init__(self, ec, guid):
94         super(LinuxApplication, self).__init__(ec, guid)
95         self._pid = None
96         self._ppid = None
97         self._home = "app-%s" % self.guid
98
99         self._logger = logging.getLogger("LinuxApplication")
100     
101     def log_message(self, msg):
102         return " guid %d - host %s - %s " % (self.guid, 
103                 self.node.get("hostname"), msg)
104
105     @property
106     def node(self):
107         node = self.get_connected(LinuxNode.rtype())
108         if node: return node[0]
109         return None
110
111     @property
112     def app_home(self):
113         return os.path.join(self.node.exp_home, self._home)
114
115     @property
116     def src_dir(self):
117         return os.path.join(self.app_home, 'src')
118
119     @property
120     def build_dir(self):
121         return os.path.join(self.app_home, 'build')
122
123     @property
124     def pid(self):
125         return self._pid
126
127     @property
128     def ppid(self):
129         return self._ppid
130
131     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
132         self.info("Retrieving '%s' trace %s " % (name, attr))
133
134         path = os.path.join(self.app_home, name)
135         
136         command = "(test -f %s && echo 'success') || echo 'error'" % path
137         (out, err), proc = self.node.execute(command)
138
139         if (err and proc.poll()) or out.find("error") != -1:
140             msg = " Couldn't find trace %s " % name
141             self.error(msg, out, err)
142             return None
143     
144         if attr == TraceAttr.PATH:
145             return path
146
147         if attr == TraceAttr.ALL:
148             (out, err), proc = self.node.check_output(self.app_home, name)
149             
150             if err and proc.poll():
151                 msg = " Couldn't read trace %s " % name
152                 self.error(msg, out, err)
153                 return None
154
155             return out
156
157         if attr == TraceAttr.STREAM:
158             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
159         elif attr == TraceAttr.SIZE:
160             cmd = "stat -c%%s %s " % path
161
162         (out, err), proc = self.node.execute(cmd)
163
164         if err and proc.poll():
165             msg = " Couldn't find trace %s " % name
166             self.error(msg, out, err)
167             return None
168         
169         if attr == TraceAttr.SIZE:
170             out = int(out.strip())
171
172         return out
173             
174     def provision(self, filters = None):
175         # create home dir for application
176         self.node.mkdir(self.app_home)
177
178         # upload sources
179         self.upload_sources()
180
181         # upload code
182         self.upload_code()
183
184         # upload stdin
185         self.upload_stdin()
186
187         # install dependencies
188         self.install_dependencies()
189
190         # build
191         self.build()
192
193         # Install
194         self.install()
195
196         command = self.get("command")
197         x11 = self.get("forwardX11")
198         if not x11 and command:
199             self.info("Uploading command '%s'" % command)
200
201             # Export environment
202             environ = ""
203             env = self.get("env") or ""
204             for var in env.split(" "):
205                 environ += 'export %s\n' % var
206
207             command = environ + command
208
209             # If the command runs asynchronous, pre upload the command 
210             # to the app.sh file in the remote host
211             dst = os.path.join(self.app_home, "app.sh")
212             command = self.replace_paths(command)
213             self.node.upload(command, dst, text = True)
214
215         super(LinuxApplication, self).provision()
216
217     def upload_sources(self):
218         # TODO: check if sources need to be uploaded and upload them
219         sources = self.get("sources")
220         if sources:
221             self.info(" Uploading sources ")
222
223             # create dir for sources
224             self.node.mkdir(self.src_dir)
225
226             sources = sources.split(' ')
227
228             http_sources = list()
229             for source in list(sources):
230                 if source.startswith("http") or source.startswith("https"):
231                     http_sources.append(source)
232                     sources.remove(source)
233
234             # Download http sources
235             if http_sources:
236                 cmd = " wget -c --directory-prefix=${SOURCES} "
237                 verif = ""
238
239                 for source in http_sources:
240                     cmd += " %s " % (source)
241                     verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
242                 
243                 # Wget output goes to stderr :S
244                 cmd += " 2> /dev/null ; "
245
246                 # Add verification
247                 cmd += " %s " % verif
248
249                 # Upload the command to a file, and execute asynchronously
250                 self.upload_and_run(cmd, 
251                         "http_sources.sh", "http_sources_pid", 
252                         "http_sources_out", "http_sources_err")
253             if sources:
254                 self.node.upload(sources, self.src_dir)
255
256     def upload_code(self):
257         code = self.get("code")
258         if code:
259             # create dir for sources
260             self.node.mkdir(self.src_dir)
261
262             self.info(" Uploading code ")
263
264             dst = os.path.join(self.src_dir, "code")
265             self.node.upload(sources, dst, text = True)
266
267     def upload_stdin(self):
268         stdin = self.get("stdin")
269         if stdin:
270             # create dir for sources
271             self.info(" Uploading stdin ")
272
273             dst = os.path.join(self.app_home, "stdin")
274             self.node.upload(stdin, dst, text = True)
275
276     def install_dependencies(self):
277         depends = self.get("depends")
278         if depends:
279             self.info(" Installing dependencies %s" % depends)
280             self.node.install_packages(depends, home = self.app_home)
281
282     def build(self):
283         build = self.get("build")
284         if build:
285             self.info(" Building sources ")
286             
287             # create dir for build
288             self.node.mkdir(self.build_dir)
289
290             # Upload the command to a file, and execute asynchronously
291             self.upload_and_run(build, 
292                     "build.sh", "build_pid", 
293                     "build_out", "build_err")
294  
295     def install(self):
296         install = self.get("install")
297         if install:
298             self.info(" Installing sources ")
299
300             # Upload the command to a file, and execute asynchronously
301             self.upload_and_run(install, 
302                     "install.sh", "install_pid", 
303                     "install_out", "install_err")
304
305     def deploy(self):
306         # Wait until node is associated and deployed
307         node = self.node
308         if not node or node.state < ResourceState.READY:
309             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
310             self.ec.schedule(reschedule_delay, self.deploy)
311         else:
312             try:
313                 command = self.get("command") or ""
314                 self.info(" Deploying command '%s' " % command)
315                 self.discover()
316                 self.provision()
317             except:
318                 self._state = ResourceState.FAILED
319                 raise
320
321             super(LinuxApplication, self).deploy()
322
323     def start(self):
324         command = self.get('command')
325         env = self.get('env')
326         stdin = 'stdin' if self.get('stdin') else None
327         stdout = 'stdout' if self.get('stdout') else 'stdout'
328         stderr = 'stderr' if self.get('stderr') else 'stderr'
329         sudo = self.get('sudo') or False
330         x11 = self.get('forwardX11') or False
331         failed = False
332
333         super(LinuxApplication, self).start()
334
335         if not command:
336             self.info("No command to start ")
337             self._state = ResourceState.FINISHED
338             return 
339     
340         self.info("Starting command '%s'" % command)
341
342         if x11:
343             if env:
344                 # Export environment
345                 environ = ""
346                 for var in env.split(" "):
347                     environ += ' %s ' % var
348
349                 command = "(" + environ + " ; " + command + ")"
350                 command = self.replace_paths(command)
351
352             # If the command requires X11 forwarding, we
353             # can't run it asynchronously
354             (out, err), proc = self.node.execute(command,
355                     sudo = sudo,
356                     stdin = stdin,
357                     forward_x11 = x11)
358
359             self._state = ResourceState.FINISHED
360
361             if proc.poll() and err:
362                 failed = True
363         else:
364             # Command was  previously uploaded, now run the remote
365             # bash file asynchronously
366             cmd = "bash ./app.sh"
367             (out, err), proc = self.node.run(cmd, self.app_home, 
368                 stdin = stdin, 
369                 stdout = stdout,
370                 stderr = stderr,
371                 sudo = sudo)
372
373             if proc.poll() and err:
374                 failed = True
375         
376             if not failed:
377                 pid, ppid = self.node.wait_pid(home = self.app_home)
378                 if pid: self._pid = int(pid)
379                 if ppid: self._ppid = int(ppid)
380
381             if not self.pid or not self.ppid:
382                 failed = True
383  
384         (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
385
386         if failed or out or chkerr:
387             # check if execution errors occurred
388             msg = " Failed to start command '%s' " % command
389             out = out
390             if err:
391                 err = err
392             elif chkerr:
393                 err = chkerr
394
395             self.error(msg, out, err)
396
397             msg2 = " Setting state to Failed"
398             self.debug(msg2)
399             self._state = ResourceState.FAILED
400
401             raise RuntimeError, msg
402
403     def stop(self):
404         state = self.state
405         if state == ResourceState.STARTED:
406             self.info("Stopping command %s" % command)
407
408             (out, err), proc = self.node.kill(self.pid, self.ppid)
409
410             if out or err:
411                 # check if execution errors occurred
412                 msg = " Failed to STOP command '%s' " % self.get("command")
413                 self.error(msg, out, err)
414                 self._state = ResourceState.FAILED
415                 stopped = False
416             else:
417                 super(LinuxApplication, self).stop()
418
419     def release(self):
420         self.info("Releasing resource")
421
422         tear_down = self.get("tearDown")
423         if tear_down:
424             self.node.execute(tear_down)
425
426         self.stop()
427         if self.state == ResourceState.STOPPED:
428             super(LinuxApplication, self).release()
429     
430     @property
431     def state(self):
432         if self._state == ResourceState.STARTED:
433             (out, err), proc = self.node.check_output(self.app_home, 'stderr')
434
435             if out or err:
436                 if err.find("No such file or directory") >= 0 :
437                     # The resource is marked as started, but the
438                     # command was not yet executed
439                     return ResourceState.READY
440
441                 # check if execution errors occurred
442                 msg = " Failed to execute command '%s'" % self.get("command")
443                 self.error(msg, out, err)
444                 self._state = ResourceState.FAILED
445
446             elif self.pid and self.ppid:
447                 status = self.node.status(self.pid, self.ppid)
448
449                 if status == sshfuncs.FINISHED:
450                     self._state = ResourceState.FINISHED
451
452         return self._state
453
454     def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
455         dst = os.path.join(self.app_home, fname)
456         cmd = self.replace_paths(cmd)
457         self.node.upload(cmd, dst, text = True)
458
459         cmd = "bash ./%s" % fname
460         (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
461             pidfile = pidfile,
462             stdout = outfile, 
463             stderr = errfile, 
464             raise_on_error = True)
465
466     def replace_paths(self, command):
467         """
468         Replace all special path tags with shell-escaped actual paths.
469         """
470         def absolute_dir(d):
471             return d if d.startswith("/") else os.path.join("${HOME}", d)
472
473         return ( command
474             .replace("${SOURCES}", absolute_dir(self.src_dir))
475             .replace("${BUILD}", absolute_dir(self.build_dir))
476             .replace("${APP_HOME}", absolute_dir(self.app_home))
477             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
478             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
479             )
480         
481     def valid_connection(self, guid):
482         # TODO: Validate!
483         return True
484         # XXX: What if it is connected to more than one node?
485         resources = self.find_resources(exact_tags = [tags.NODE])
486         self._node = resources[0] if len(resources) == 1 else None
487         return self._node
488