4f2b989fa1c54ef506521f95d523a85eef0758da
[nepi.git] / src / neco / resources / linux / application.py
1 from neco.execution.attribute import Attribute, Flags, Types
2 from neco.execution.trace import Trace, TraceAttr
3 from neco.execution.resource import ResourceManager, clsinit, ResourceState
4 from neco.resources.linux.node import LinuxNode
5 from neco.util import sshfuncs 
6
7 import logging
8 import os
9
10 reschedule_delay = "0.5s"
11
12 # TODO: Resolve wildcards in commands!! 
13
14 @clsinit
15 class LinuxApplication(ResourceManager):
16     _rtype = "LinuxApplication"
17
18     @classmethod
19     def _register_attributes(cls):
20         command = Attribute("command", "Command to execute", 
21                 flags = Flags.ExecReadOnly)
22         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
23                 flags = Flags.ExecReadOnly)
24         env = Attribute("env", "Environment variables string for command execution",
25                 flags = Flags.ExecReadOnly)
26         sudo = Attribute("sudo", "Run with root privileges", 
27                 flags = Flags.ExecReadOnly)
28         depends = Attribute("depends", 
29                 "Space-separated list of packages required to run the application",
30                 flags = Flags.ExecReadOnly)
31         sources = Attribute("sources", 
32                 "Space-separated list of regular files to be deployed in the working "
33                 "path prior to building. Archives won't be expanded automatically.",
34                 flags = Flags.ExecReadOnly)
35         code = Attribute("code", 
36                 "Plain text source code to be uploaded to the server. It will be stored "
37                 "under ${SOURCES}/code",
38                 flags = Flags.ExecReadOnly)
39         build = Attribute("build", 
40                 "Build commands to execute after deploying the sources. "
41                 "Sources will be in the ${SOURCES} folder. "
42                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
43                 "Try to make the commands return with a nonzero exit code on error.\n"
44                 "Also, do not install any programs here, use the 'install' attribute. This will "
45                 "help keep the built files constrained to the build folder (which may "
46                 "not be the home folder), and will result in faster deployment. Also, "
47                 "make sure to clean up temporary files, to reduce bandwidth usage between "
48                 "nodes when transferring built packages.",
49                 flags = Flags.ReadOnly)
50         install = Attribute("install", 
51                 "Commands to transfer built files to their final destinations. "
52                 "Sources will be in the initial working folder, and a special "
53                 "tag ${SOURCES} can be used to reference the experiment's "
54                 "home folder (where the application commands will run).\n"
55                 "ALL sources and targets needed for execution must be copied there, "
56                 "if building has been enabled.\n"
57                 "That is, 'slave' nodes will not automatically get any source files. "
58                 "'slave' nodes don't get build dependencies either, so if you need "
59                 "make and other tools to install, be sure to provide them as "
60                 "actual dependencies instead.",
61                 flags = Flags.ReadOnly)
62         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
63         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
64         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
65         tear_down = Attribute("tearDown", "Bash script to be executed before "
66                 "releasing the resource", 
67                 flags = Flags.ReadOnly)
68
69         cls._register_attribute(command)
70         cls._register_attribute(forward_x11)
71         cls._register_attribute(env)
72         cls._register_attribute(sudo)
73         cls._register_attribute(depends)
74         cls._register_attribute(sources)
75         cls._register_attribute(code)
76         cls._register_attribute(build)
77         cls._register_attribute(install)
78         cls._register_attribute(stdin)
79         cls._register_attribute(stdout)
80         cls._register_attribute(stderr)
81         cls._register_attribute(tear_down)
82
83     @classmethod
84     def _register_traces(cls):
85         stdout = Trace("stdout", "Standard output stream")
86         stderr = Trace("stderr", "Standard error stream")
87         buildlog = Trace("buildlog", "Output of the build process")
88
89         cls._register_trace(stdout)
90         cls._register_trace(stderr)
91         cls._register_trace(buildlog)
92
93     def __init__(self, ec, guid):
94         super(LinuxApplication, self).__init__(ec, guid)
95         self._pid = None
96         self._ppid = None
97         self._home = "app-%s" % self.guid
98
99         self._logger = logging.getLogger("LinuxApplication")
100     
101     def log_message(self, msg):
102         return " guid %d - host %s - %s " % (self.guid, 
103                 self.node.get("hostname"), msg)
104
105     @property
106     def node(self):
107         node = self.get_connected(LinuxNode.rtype())
108         if node: return node[0]
109         return None
110
111     @property
112     def app_home(self):
113         return os.path.join(self.node.exp_home, self._home)
114
115     @property
116     def src_dir(self):
117         return os.path.join(self.app_home, 'src')
118
119     @property
120     def build_dir(self):
121         return os.path.join(self.app_home, 'build')
122
123     @property
124     def pid(self):
125         return self._pid
126
127     @property
128     def ppid(self):
129         return self._ppid
130
131     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
132         self.info("Retrieving '%s' trace %s " % (name, attr))
133
134         path = os.path.join(self.app_home, name)
135         
136         command = "(test -f %s && echo 'success') || echo 'error'" % path
137         (out, err), proc = self.node.execute(command)
138
139         if (err and proc.poll()) or out.find("error") != -1:
140             msg = " Couldn't find trace %s " % name
141             self.error(msg, out, err)
142             return None
143     
144         if attr == TraceAttr.PATH:
145             return path
146
147         if attr == TraceAttr.ALL:
148             (out, err), proc = self.node.check_output(self.app_home, name)
149             
150             if err and proc.poll():
151                 msg = " Couldn't read trace %s " % name
152                 self.error(msg, out, err)
153                 return None
154
155             return out
156
157         if attr == TraceAttr.STREAM:
158             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
159         elif attr == TraceAttr.SIZE:
160             cmd = "stat -c%%s %s " % path
161
162         (out, err), proc = self.node.execute(cmd)
163
164         if err and proc.poll():
165             msg = " Couldn't find trace %s " % name
166             self.error(msg, out, err)
167             return None
168         
169         if attr == TraceAttr.SIZE:
170             out = int(out.strip())
171
172         return out
173             
174     def provision(self, filters = None):
175         # create home dir for application
176         self.node.mkdir(self.app_home)
177
178         # upload sources
179         self.upload_sources()
180
181         # upload code
182         self.upload_code()
183
184         # install dependencies
185         self.install_dependencies()
186
187         # build
188         self.build()
189
190         # Install
191         self.install()
192
193         command = self.get("command")
194         x11 = self.get("forwardX11")
195         if not x11 and command:
196             self.info("Uploading command '%s'" % command)
197         
198             # TODO: missing set PATH and PYTHONPATH!!
199
200             # If the command runs asynchronous, pre upload the command 
201             # to the app.sh file in the remote host
202             dst = os.path.join(self.app_home, "app.sh")
203             command = self.replace_paths(command)
204             self.node.upload(command, dst, text = True)
205
206         super(LinuxApplication, self).provision()
207
208     def upload_sources(self):
209         # TODO: check if sources need to be uploaded and upload them
210         sources = self.get("sources")
211         if sources:
212             self.info(" Uploading sources ")
213
214             # create dir for sources
215             self.node.mkdir(self.src_dir)
216
217             sources = sources.split(' ')
218
219             http_sources = list()
220             for source in list(sources):
221                 if source.startswith("http") or source.startswith("https"):
222                     http_sources.append(source)
223                     sources.remove(source)
224
225             # Download http sources
226             if http_sources:
227                 cmd = " wget -c --directory-prefix=${SOURCES} "
228                 verif = ""
229
230                 for source in http_sources:
231                     cmd += " %s " % (source)
232                     verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
233                 
234                 # Wget output goes to stderr :S
235                 cmd += " 2> /dev/null ; "
236
237                 # Add verification
238                 cmd += " %s " % verif
239
240                 # Upload the command to a file, and execute asynchronously
241                 self.upload_and_run(cmd, 
242                         "http_sources.sh", "http_sources_pid", 
243                         "http_sources_out", "http_sources_err")
244             if sources:
245                 self.node.upload(sources, self.src_dir)
246
247     def upload_code(self):
248         code = self.get("code")
249         if code:
250             # create dir for sources
251             self.node.mkdir(self.src_dir)
252
253             self.info(" Uploading code ")
254
255             dst = os.path.join(self.src_dir, "code")
256             self.node.upload(sources, dst, text = True)
257
258     def install_dependencies(self):
259         depends = self.get("depends")
260         if depends:
261             self.info(" Installing dependencies %s" % depends)
262             self.node.install_packages(depends, home = self.app_home)
263
264     def build(self):
265         build = self.get("build")
266         if build:
267             self.info(" Building sources ")
268             
269             # create dir for build
270             self.node.mkdir(self.build_dir)
271
272             # Upload the command to a file, and execute asynchronously
273             self.upload_and_run(build, 
274                     "build.sh", "build_pid", 
275                     "build_out", "build_err")
276  
277     def install(self):
278         install = self.get("install")
279         if install:
280             self.info(" Installing sources ")
281
282             # Upload the command to a file, and execute asynchronously
283             self.upload_and_run(install, 
284                     "install.sh", "install_pid", 
285                     "install_out", "install_err")
286
287     def deploy(self):
288         # Wait until node is associated and deployed
289         node = self.node
290         if not node or node.state < ResourceState.READY:
291             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
292             self.ec.schedule(reschedule_delay, self.deploy)
293         else:
294             try:
295                 command = self.get("command") or ""
296                 self.info(" Deploying command '%s' " % command)
297                 self.discover()
298                 self.provision()
299             except:
300                 self._state = ResourceState.FAILED
301                 raise
302
303             super(LinuxApplication, self).deploy()
304
305     def start(self):
306         command = self.get("command")
307         env = self.get("env")
308         stdin = 'stdin' if self.get("stdin") else None
309         stdout = 'stdout' if self.get("stdout") else 'stdout'
310         stderr = 'stderr' if self.get("stderr") else 'stderr'
311         sudo = self.get('sudo') or False
312         x11 = self.get("forwardX11") or False
313         failed = False
314
315         super(LinuxApplication, self).start()
316
317         if not command:
318             self.info("No command to start ")
319             self._state = ResourceState.FINISHED
320             return 
321     
322         self.info("Starting command '%s'" % command)
323
324         if x11:
325             # If the command requires X11 forwarding, we
326             # can't run it asynchronously
327             (out, err), proc = self.node.execute(command,
328                     sudo = sudo,
329                     stdin = stdin,
330                     stdout = stdout,
331                     stderr = stderr,
332                     env = env,
333                     forward_x11 = x11)
334
335             if proc.poll() and err:
336                 failed = True
337         else:
338             # Command was  previously uploaded, now run the remote
339             # bash file asynchronously
340             if env:
341                 env = self.replace_paths(env)
342
343             cmd = "bash ./app.sh"
344             (out, err), proc = self.node.run(cmd, self.app_home, 
345                 stdin = stdin, 
346                 stdout = stdout,
347                 stderr = stderr,
348                 sudo = sudo)
349
350             if proc.poll() and err:
351                 failed = True
352         
353             if not failed:
354                 pid, ppid = self.node.wait_pid(home = self.app_home)
355                 if pid: self._pid = int(pid)
356                 if ppid: self._ppid = int(ppid)
357
358             if not self.pid or not self.ppid:
359                 failed = True
360  
361         (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
362
363         if failed or out or chkerr:
364             # check if execution errors occurred
365             msg = " Failed to start command '%s' " % command
366             out = out
367             if err:
368                 err = err
369             elif chkerr:
370                 err = chkerr
371
372             self.error(msg, out, err)
373
374             msg2 = " Setting state to Failed"
375             self.debug(msg2)
376             self._state = ResourceState.FAILED
377
378             raise RuntimeError, msg
379
380     def stop(self):
381         state = self.state
382         if state == ResourceState.STARTED:
383             self.info("Stopping command %s" % command)
384
385             (out, err), proc = self.node.kill(self.pid, self.ppid)
386
387             if out or err:
388                 # check if execution errors occurred
389                 msg = " Failed to STOP command '%s' " % self.get("command")
390                 self.error(msg, out, err)
391                 self._state = ResourceState.FAILED
392                 stopped = False
393             else:
394                 super(LinuxApplication, self).stop()
395
396     def release(self):
397         self.info("Releasing resource")
398
399         tear_down = self.get("tearDown")
400         if tear_down:
401             self.node.execute(tear_down)
402
403         self.stop()
404         if self.state == ResourceState.STOPPED:
405             super(LinuxApplication, self).release()
406     
407     @property
408     def state(self):
409         if self._state == ResourceState.STARTED:
410             (out, err), proc = self.node.check_output(self.app_home, 'stderr')
411
412             if out or err:
413                 if err.find("No such file or directory") >= 0 :
414                     # The resource is marked as started, but the
415                     # command was not yet executed
416                     return ResourceState.READY
417
418                 # check if execution errors occurred
419                 msg = " Failed to execute command '%s'" % self.get("command")
420                 self.error(msg, out, err)
421                 self._state = ResourceState.FAILED
422
423             elif self.pid and self.ppid:
424                 status = self.node.status(self.pid, self.ppid)
425
426                 if status == sshfuncs.FINISHED:
427                     self._state = ResourceState.FINISHED
428
429         return self._state
430
431     def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
432         dst = os.path.join(self.app_home, fname)
433         cmd = self.replace_paths(cmd)
434         self.node.upload(cmd, dst, text = True)
435
436         cmd = "bash ./%s" % fname
437         (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
438             pidfile = pidfile,
439             stdout = outfile, 
440             stderr = errfile, 
441             raise_on_error = True)
442
443     def replace_paths(self, command):
444         """
445         Replace all special path tags with shell-escaped actual paths.
446         """
447         def absolute_dir(d):
448             return d if d.startswith("/") else os.path.join("${HOME}", d)
449
450         return ( command
451             .replace("${SOURCES}", absolute_dir(self.src_dir))
452             .replace("${BUILD}", absolute_dir(self.build_dir))
453             .replace("${APP_HOME}", absolute_dir(self.app_home))
454             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
455             .replace("${EXP_HOME}", self.node.exp_home) )
456         
457     def valid_connection(self, guid):
458         # TODO: Validate!
459         return True
460         # XXX: What if it is connected to more than one node?
461         resources = self.find_resources(exact_tags = [tags.NODE])
462         self._node = resources[0] if len(resources) == 1 else None
463         return self._node
464
465     def hash_app(self):
466         """ Generates a hash representing univokely the application.
467         Is used to determine whether the home directory should be cleaned
468         or not.
469
470         """
471         command = self.get("command")
472         forwards_x11 = self.get("forwardX11")
473         env = self.get("env")
474         sudo = self.get("sudo")
475         depends = self.get("depends")
476         sources = self.get("sources")
477         cls._register_attribute(sources)
478         cls._register_attribute(build)
479         cls._register_attribute(install)
480         cls._register_attribute(stdin)
481         cls._register_attribute(stdout)
482         cls._register_attribute(stderr)
483         cls._register_attribute(tear_down)
484         skey = "".join(map(str, args))
485         return hashlib.md5(skey).hexdigest()
486