Added unit tests for linux application
[nepi.git] / src / neco / resources / linux / application.py
1 from neco.execution.attribute import Attribute, Flags, Types
2 from neco.execution.trace import Trace, TraceAttr
3 from neco.execution.resource import ResourceManager, clsinit, ResourceState
4 from neco.resources.linux.node import LinuxNode
5 from neco.util import sshfuncs 
6
7 import logging
8 import os
9
10 reschedule_delay = "0.5s"
11
12 # TODO: Resolve wildcards in commands!! 
13
14 @clsinit
15 class LinuxApplication(ResourceManager):
16     _rtype = "LinuxApplication"
17
18     @classmethod
19     def _register_attributes(cls):
20         command = Attribute("command", "Command to execute", 
21                 flags = Flags.ExecReadOnly)
22         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
23                 flags = Flags.ExecReadOnly)
24         env = Attribute("env", "Environment variables string for command execution",
25                 flags = Flags.ExecReadOnly)
26         sudo = Attribute("sudo", "Run with root privileges", 
27                 flags = Flags.ExecReadOnly)
28         depends = Attribute("depends", 
29                 "Space-separated list of packages required to run the application",
30                 flags = Flags.ExecReadOnly)
31         sources = Attribute("sources", 
32                 "Space-separated list of regular files to be deployed in the working "
33                 "path prior to building. Archives won't be expanded automatically.",
34                 flags = Flags.ExecReadOnly)
35         code = Attribute("code", 
36                 "Plain text source code to be uploaded to the server. It will be stored "
37                 "under ${SOURCES}/code",
38                 flags = Flags.ExecReadOnly)
39         build = Attribute("build", 
40                 "Build commands to execute after deploying the sources. "
41                 "Sources will be in the ${SOURCES} folder. "
42                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
43                 "Try to make the commands return with a nonzero exit code on error.\n"
44                 "Also, do not install any programs here, use the 'install' attribute. This will "
45                 "help keep the built files constrained to the build folder (which may "
46                 "not be the home folder), and will result in faster deployment. Also, "
47                 "make sure to clean up temporary files, to reduce bandwidth usage between "
48                 "nodes when transferring built packages.",
49                 flags = Flags.ReadOnly)
50         install = Attribute("install", 
51                 "Commands to transfer built files to their final destinations. "
52                 "Sources will be in the initial working folder, and a special "
53                 "tag ${SOURCES} can be used to reference the experiment's "
54                 "home folder (where the application commands will run).\n"
55                 "ALL sources and targets needed for execution must be copied there, "
56                 "if building has been enabled.\n"
57                 "That is, 'slave' nodes will not automatically get any source files. "
58                 "'slave' nodes don't get build dependencies either, so if you need "
59                 "make and other tools to install, be sure to provide them as "
60                 "actual dependencies instead.",
61                 flags = Flags.ReadOnly)
62         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
63         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
64         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
65         tear_down = Attribute("tearDown", "Bash script to be executed before "
66                 "releasing the resource", 
67                 flags = Flags.ReadOnly)
68
69         cls._register_attribute(command)
70         cls._register_attribute(forward_x11)
71         cls._register_attribute(env)
72         cls._register_attribute(sudo)
73         cls._register_attribute(depends)
74         cls._register_attribute(sources)
75         cls._register_attribute(code)
76         cls._register_attribute(build)
77         cls._register_attribute(install)
78         cls._register_attribute(stdin)
79         cls._register_attribute(stdout)
80         cls._register_attribute(stderr)
81         cls._register_attribute(tear_down)
82
83     @classmethod
84     def _register_traces(cls):
85         stdout = Trace("stdout", "Standard output stream")
86         stderr = Trace("stderr", "Standard error stream")
87         buildlog = Trace("buildlog", "Output of the build process")
88
89         cls._register_trace(stdout)
90         cls._register_trace(stderr)
91         cls._register_trace(buildlog)
92
93     def __init__(self, ec, guid):
94         super(LinuxApplication, self).__init__(ec, guid)
95         self._pid = None
96         self._ppid = None
97         self._home = "app-%s" % self.guid
98
99         self._logger = logging.getLogger("LinuxApplication")
100     
101     def log_message(self, msg):
102         return " guid %d - host %s - %s " % (self.guid, 
103                 self.node.get("hostname"), msg)
104
105     @property
106     def node(self):
107         node = self.get_connected(LinuxNode.rtype())
108         if node: return node[0]
109         return None
110
111     @property
112     def app_home(self):
113         return os.path.join(self.node.exp_dir, self._home)
114
115     @property
116     def src_dir(self):
117         return os.path.join(self.app_home, 'src')
118
119     @property
120     def build_dir(self):
121         return os.path.join(self.app_home, 'build')
122
123     @property
124     def pid(self):
125         return self._pid
126
127     @property
128     def ppid(self):
129         return self._ppid
130
131     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
132         self.info("Retrieving '%s' trace %s " % (name, attr))
133
134         path = os.path.join(self.app_home, name)
135         
136         cmd = "(test -f %s && echo 'success') || echo 'error'" % path
137         (out, err), proc = self.node.execute(cmd)
138
139         if (err and proc.poll()) or out.find("error") != -1:
140             msg = " Couldn't find trace %s " % name
141             self.error(msg, out, err)
142             return None
143     
144         if attr == TraceAttr.PATH:
145             return path
146
147         if attr == TraceAttr.ALL:
148             (out, err), proc = self.node.check_output(self.app_home, name)
149             
150             if err and proc.poll():
151                 msg = " Couldn't read trace %s " % name
152                 self.error(msg, out, err)
153                 return None
154
155             return out
156
157         if attr == TraceAttr.STREAM:
158             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
159         elif attr == TraceAttr.SIZE:
160             cmd = "stat -c%%s %s " % path
161
162         (out, err), proc = self.node.execute(cmd)
163
164         if err and proc.poll():
165             msg = " Couldn't find trace %s " % name
166             self.error(msg, out, err)
167             return None
168         
169         if attr == TraceAttr.SIZE:
170             out = int(out.strip())
171
172         return out
173             
174     def provision(self, filters = None):
175         # create home dir for application
176         self.node.mkdir(self.app_home)
177
178         # upload sources
179         self.upload_sources()
180
181         # upload code
182         self.upload_code()
183
184         # install dependencies
185         self.install_dependencies()
186
187         # build
188         self.build()
189
190         # Install
191         self.install()
192
193         command = self.replace_paths(self.get("command"))
194         x11 = self.get("forwardX11") or False
195         if not x11:
196             self.info("Uploading command '%s'" % command)
197             
198             # If the command runs asynchronous, pre upload the command 
199             # to the app.sh file in the remote host
200             dst = os.path.join(self.app_home, "app.sh")
201             self.node.upload(command, dst, text = True)
202
203         super(LinuxApplication, self).provision()
204
205     def upload_sources(self):
206         # TODO: check if sources need to be uploaded and upload them
207         sources = self.get("sources")
208         if sources:
209             self.info(" Uploading sources ")
210
211             # create dir for sources
212             self.node.mkdir(self.src_dir)
213
214             sources = sources.split(' ')
215
216             http_sources = list()
217             for source in list(sources):
218                 if source.startswith("http") or source.startswith("https"):
219                     http_sources.append(source)
220                     sources.remove(source)
221
222             # Download http sources
223             for source in http_sources:
224                 dst = os.path.join(self.src_dir, source.split("/")[-1])
225                 # TODO: Check if the tar.gz is already downloaded using a hash
226                 # and don't download twice !!
227                 command = "wget -o %s %s" % (dst, source)
228                 self.node.execute(command)
229
230             self.node.upload(sources, self.src_dir)
231
232     def upload_code(self):
233         code = self.get("code")
234         if code:
235             # create dir for sources
236             self.node.mkdir(self.src_dir)
237
238             self.info(" Uploading code ")
239
240             dst = os.path.join(self.src_dir, "code")
241             self.node.upload(sources, dst, text = True)
242
243     def install_dependencies(self):
244         depends = self.get("depends")
245         if depends:
246             self.info(" Installing dependencies %s" % depends)
247             self.node.install_packages(depends, home = self.app_home)
248
249     def build(self):
250         build = self.get("build")
251         if build:
252             self.info(" Building sources ")
253             
254             # create dir for build
255             self.node.mkdir(self.build_dir)
256
257             cmd = self.replace_paths(build)
258
259             (out, err), proc = self.run_and_wait(cmd, self.app_home,
260                 pidfile = "build_pid",
261                 stdout = "build_out", 
262                 stderr = "build_err", 
263                 raise_on_error = True)
264  
265     def install(self):
266         install = self.get("install")
267         if install:
268             self.info(" Installing sources ")
269
270             cmd = self.replace_paths(install)
271
272             (out, err), proc = self.run_and_wait(cmd, self.app_home, 
273                 pidfile = "install_pid",
274                 stdout = "install_out", 
275                 stderr = "install_err", 
276                 raise_on_error = True)
277
278     def deploy(self):
279         command = self.replace_paths(self.get("command"))
280         
281         self.info(" Deploying command '%s' " % command)
282
283         # Wait until node is associated and deployed
284         node = self.node
285         if not node or node.state < ResourceState.READY:
286             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
287             self.ec.schedule(reschedule_delay, self.deploy)
288         else:
289             try:
290                 self.discover()
291                 self.provision()
292             except:
293                 self._state = ResourceState.FAILED
294                 raise
295
296             super(LinuxApplication, self).deploy()
297
298     def start(self):
299         command = self.replace_paths(self.get("command"))
300         env = self.get("env")
301         stdin = 'stdin' if self.get("stdin") else None
302         sudo = self.get('sudo') or False
303         x11 = self.get("forwardX11") or False
304         failed = False
305
306         super(LinuxApplication, self).start()
307
308         self.info("Starting command '%s'" % command)
309
310         if x11:
311             (out, err), proc = self.node.execute(command,
312                     sudo = sudo,
313                     stdin = stdin,
314                     stdout = 'stdout',
315                     stderr = 'stderr',
316                     env = env,
317                     forward_x11 = x11)
318
319             if proc.poll() and err:
320                 failed = True
321         else:
322             # Run the command asynchronously
323             command = "bash ./app.sh"
324             (out, err), proc = self.node.run(command, self.app_home, 
325                 stdin = stdin, 
326                 sudo = sudo)
327
328             if proc.poll() and err:
329                 failed = True
330         
331             if not failed:
332                 pid, ppid = self.node.wait_pid(home = self.app_home)
333                 if pid: self._pid = int(pid)
334                 if ppid: self._ppid = int(ppid)
335
336             if not self.pid or not self.ppid:
337                 failed = True
338  
339         (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
340
341         if failed or out or chkerr:
342             # check if execution errors occurred
343             msg = " Failed to start command '%s' " % command
344             out = out
345             if err:
346                 err = err
347             elif chkerr:
348                 err = chkerr
349
350             self.error(msg, out, err)
351
352             msg2 = " Setting state to Failed"
353             self.debug(msg2)
354             self._state = ResourceState.FAILED
355
356             raise RuntimeError, msg
357
358     def stop(self):
359         state = self.state
360         if state == ResourceState.STARTED:
361             self.info("Stopping command %s" % command)
362
363             (out, err), proc = self.node.kill(self.pid, self.ppid)
364
365             if out or err:
366                 # check if execution errors occurred
367                 msg = " Failed to STOP command '%s' " % self.get("command")
368                 self.error(msg, out, err)
369                 self._state = ResourceState.FAILED
370                 stopped = False
371             else:
372                 super(LinuxApplication, self).stop()
373
374     def release(self):
375         self.info("Releasing resource")
376
377         tear_down = self.get("tearDown")
378         if tear_down:
379             self.node.execute(tear_down)
380
381         self.stop()
382         if self.state == ResourceState.STOPPED:
383             super(LinuxApplication, self).release()
384     
385     @property
386     def state(self):
387         if self._state == ResourceState.STARTED:
388             (out, err), proc = self.node.check_output(self.app_home, 'stderr')
389
390             if out or err:
391                 if err.find("No such file or directory") >= 0 :
392                     # The resource is marked as started, but the
393                     # command was not yet executed
394                     return ResourceState.READY
395
396                 # check if execution errors occurred
397                 msg = " Failed to execute command '%s'" % self.get("command")
398                 self.error(msg, out, err)
399                 self._state = ResourceState.FAILED
400
401             elif self.pid and self.ppid:
402                 status = self.node.status(self.pid, self.ppid)
403
404                 if status == sshfuncs.FINISHED:
405                     self._state = ResourceState.FINISHED
406
407         return self._state
408
409     def valid_connection(self, guid):
410         # TODO: Validate!
411         return True
412         # XXX: What if it is connected to more than one node?
413         resources = self.find_resources(exact_tags = [tags.NODE])
414         self._node = resources[0] if len(resources) == 1 else None
415         return self._node
416
417     def hash_app(self):
418         """ Generates a hash representing univokely the application.
419         Is used to determine whether the home directory should be cleaned
420         or not.
421
422         """
423         command = self.get("command")
424         forwards_x11 = self.get("forwardX11")
425         env = self.get("env")
426         sudo = self.get("sudo")
427         depends = self.get("depends")
428         sources = self.get("sources")
429         cls._register_attribute(sources)
430         cls._register_attribute(build)
431         cls._register_attribute(install)
432         cls._register_attribute(stdin)
433         cls._register_attribute(stdout)
434         cls._register_attribute(stderr)
435         cls._register_attribute(tear_down)
436         skey = "".join(map(str, args))
437         return hashlib.md5(skey).hexdigest()
438
439     def replace_paths(self, command):
440         """
441         Replace all special path tags with shell-escaped actual paths.
442         """
443         return ( command
444             .replace("${SOURCES}", self.src_dir)
445             .replace("${BUILD}", self.build_dir) 
446             .replace("${APPHOME}", self.app_home) 
447             .replace("${NODEHOME}", self.node.home) )
448
449