Added Linux Application
[nepi.git] / src / neco / resources / linux / application.py
1 from neco.execution.attribute import Attribute, Flags, Types
2 from neco.execution.trace import Trace, TraceAttr
3 from neco.execution.resource import ResourceManager, clsinit, ResourceState
4 from neco.resources.linux.node import LinuxNode
5 from neco.util import sshfuncs 
6
7 import logging
8 import os
9
10 DELAY ="1s"
11
12 # TODO: Resolve wildcards in commands!! 
13
14 @clsinit
15 class LinuxApplication(ResourceManager):
16     _rtype = "LinuxApplication"
17
18     @classmethod
19     def _register_attributes(cls):
20         command = Attribute("command", "Command to execute", 
21                 flags = Flags.ExecReadOnly)
22         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
23                 flags = Flags.ExecReadOnly)
24         env = Attribute("env", "Environment variables string for command execution",
25                 flags = Flags.ExecReadOnly)
26         sudo = Attribute("sudo", "Run with root privileges", 
27                 flags = Flags.ExecReadOnly)
28         depends = Attribute("depends", 
29                 "Space-separated list of packages required to run the application",
30                 flags = Flags.ExecReadOnly)
31         sources = Attribute("sources", 
32                 "Space-separated list of regular files to be deployed in the working "
33                 "path prior to building. Archives won't be expanded automatically.",
34                 flags = Flags.ExecReadOnly)
35         code = Attribute("code", 
36                 "Plain text source code to be uploaded to the server. It will be stored "
37                 "under ${SOURCES}/code",
38                 flags = Flags.ExecReadOnly)
39         build = Attribute("build", 
40                 "Build commands to execute after deploying the sources. "
41                 "Sources will be in the ${SOURCES} folder. "
42                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
43                 "Try to make the commands return with a nonzero exit code on error.\n"
44                 "Also, do not install any programs here, use the 'install' attribute. This will "
45                 "help keep the built files constrained to the build folder (which may "
46                 "not be the home folder), and will result in faster deployment. Also, "
47                 "make sure to clean up temporary files, to reduce bandwidth usage between "
48                 "nodes when transferring built packages.",
49                 flags = Flags.ReadOnly)
50         install = Attribute("install", 
51                 "Commands to transfer built files to their final destinations. "
52                 "Sources will be in the initial working folder, and a special "
53                 "tag ${SOURCES} can be used to reference the experiment's "
54                 "home folder (where the application commands will run).\n"
55                 "ALL sources and targets needed for execution must be copied there, "
56                 "if building has been enabled.\n"
57                 "That is, 'slave' nodes will not automatically get any source files. "
58                 "'slave' nodes don't get build dependencies either, so if you need "
59                 "make and other tools to install, be sure to provide them as "
60                 "actual dependencies instead.",
61                 flags = Flags.ReadOnly)
62         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
63         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
64         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
65         update_home = Attribute("updateHome", "If application hash has changed remove old directory and"
66                 "re-upload before starting experiment. If not keep the same directory", 
67                 default = True,
68                 type = Types.Bool, 
69                 flags = Flags.ExecReadOnly)
70
71         tear_down = Attribute("tearDown", "Bash script to be executed before "
72                 "releasing the resource", 
73                 flags = Flags.ReadOnly)
74
75         cls._register_attribute(command)
76         cls._register_attribute(forward_x11)
77         cls._register_attribute(env)
78         cls._register_attribute(sudo)
79         cls._register_attribute(depends)
80         cls._register_attribute(sources)
81         cls._register_attribute(code)
82         cls._register_attribute(build)
83         cls._register_attribute(install)
84         cls._register_attribute(stdin)
85         cls._register_attribute(stdout)
86         cls._register_attribute(stderr)
87         cls._register_attribute(update_home)
88         cls._register_attribute(tear_down)
89
90     @classmethod
91     def _register_traces(cls):
92         stdout = Trace("stdout", "Standard output stream")
93         stderr = Trace("stderr", "Standard error stream")
94         buildlog = Trace("buildlog", "Output of the build process")
95
96         cls._register_trace(stdout)
97         cls._register_trace(stderr)
98         cls._register_trace(buildlog)
99
100     def __init__(self, ec, guid):
101         super(LinuxApplication, self).__init__(ec, guid)
102         self._pid = None
103         self._ppid = None
104         self._home = "app-%s" % self.guid
105
106         self._logger = logging.getLogger("neco.linux.Application.%d" % guid)
107
108     @property
109     def node(self):
110         node = self.get_connected(LinuxNode.rtype())
111         if node: return node[0]
112         return None
113
114     @property
115     def home(self):
116         return os.path.join(self.node.exp_dir, self._home)
117
118     @property
119     def src_dir(self):
120         return os.path.join(self.home, 'src')
121
122     @property
123     def build_dir(self):
124         return os.path.join(self.home, 'build')
125
126     @property
127     def pid(self):
128         return self._pid
129
130     @property
131     def ppid(self):
132         return self._ppid
133
134     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
135         path = os.path.join(self.home, name)
136         
137         cmd = "(test -f %s && echo 'success') || echo 'error'" % path
138         (out, err), proc = self.node.execute(cmd)
139
140         if (err and proc.poll()) or out.find("error") != -1:
141             err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
142                     name, self.node.get("hostname"), err)
143             self.logger.error(err_msg)
144             return None
145     
146         if attr == TraceAttr.PATH:
147             return path
148
149         if attr == TraceAttr.ALL:
150             (out, err), proc = self.node.check_output(self.home, name)
151             
152             if err and proc.poll():
153                 err_msg = " Couldn't read trace %s on host %s. Error: %s" % (
154                             name, self.node.get("hostname"), err)
155                 self.logger.error(err_msg)
156                 return None
157
158             return out
159
160         if attr == TraceAttr.STREAM:
161             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
162         elif attr == TraceAttr.SIZE:
163             cmd = "stat -c%%s %s " % path
164
165         (out, err), proc = self.node.execute(cmd)
166
167         if err and proc.poll():
168             err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
169                     name, self.node.get("hostname"), err)
170             self.logger.error(err_msg)
171             return None
172         
173         if attr == TraceAttr.SIZE:
174             out = int(out.strip())
175
176         return out
177             
178     def provision(self, filters = None):
179         # TODO: verify home hash or clean home
180
181         # create home dir for application
182         self.node.mkdir(self.home)
183
184         # upload sources
185         self.upload_sources()
186
187         # upload code
188         self.upload_code()
189
190         # install dependencies
191         self.install_dependencies()
192
193         # build
194         self.build()
195
196         # Install
197         self.install()
198
199         super(LinuxApplication, self).provision()
200
201     def upload_sources(self):
202         # check if sources need to be uploaded and upload them
203         sources = self.get("sources")
204         if sources:
205             self.logger.debug(" Uploading sources %s" % sources)
206
207             # create dir for sources
208             self.node.mkdir(self.src_dir)
209
210             sources = self.sources.split(' ')
211
212             http_sources = list()
213             for source in list(sources):
214                 if source.startswith("http") or source.startswith("https"):
215                     http_sources.append(source)
216                     sources.remove(source)
217
218             # Download http sources
219             for source in http_sources:
220                 dst = os.path.join(self.src_dir, source.split("/")[-1])
221                 command = "wget -o %s %s" % (dst, source)
222                 self.node.execute(command)
223
224             self.node.upload(sources, self.src_dir)
225
226     def upload_code(self):
227         code = self.get("code")
228         if code:
229             # create dir for sources
230             self.node.mkdir(self.src_dir)
231
232             self.logger.debug(" Uploading code '%s'" % code)
233
234             dst = os.path.join(self.src_dir, "code")
235             self.node.upload(sources, dst, text = True)
236
237     def install_dependencies(self):
238         depends = self.get("depends")
239         if depends:
240             self.logger.debug(" Installing dependencies %s" % depends)
241             self.node.install_packages(depends, home = self.home)
242
243     def build(self):
244         build = self.get("build")
245         if build:
246             self.logger.debug(" Building sources '%s'" % build)
247             
248             # create dir for build
249             self.node.mkdir(self.build_dir)
250
251             cmd = self.replace_paths(build)
252
253             (out, err), proc = self.run_and_wait(cmd, self.home,
254                 pidfile = "build_pid",
255                 stdout = "build_log", 
256                 stderr = "build_err", 
257                 raise_on_error = True)
258  
259     def install(self):
260         install = self.get("install")
261         if install:
262             self.logger.debug(" Installing sources '%s'" % install)
263
264             cmd = self.replace_paths(install)
265
266             (out, err), proc = self.run_and_wait(cmd, self.home, 
267                 pidfile = "install_pid",
268                 stdout = "install_log", 
269                 stderr = "install_err", 
270                 raise_on_error = True)
271
272     def deploy(self):
273         # Wait until node is associated and deployed
274         node = self.node
275         if not node or node.state < ResourceState.READY:
276             self.ec.schedule(DELAY, self.deploy)
277         else:
278             self.discover()
279             self.provision()
280
281             super(LinuxApplication, self).deploy()
282
283     def start(self):
284         command = self.replace_paths(self.get("command"))
285         env = self.get("env")
286         stdin = 'stdin' if self.get("stdin") else None
287         sudo = self.get('sudo') or False
288         x11 = self.get("forwardX11") or False
289         err_msg = "Failed to run command %s on host %s" % (
290                      command, self.node.get("hostname"))
291         failed = False
292
293         super(LinuxApplication, self).start()
294
295         if x11:
296             (out, err), proc = self.node.execute(command,
297                     sudo = sudo,
298                     stdin = stdin,
299                     stdout = 'stdout',
300                     stderr = 'stderr',
301                     env = env,
302                     forward_x11 = x11)
303
304             if proc.poll() and err:
305                 failed = True
306         else:
307             (out, err), proc = self.node.run(command, self.home, 
308                 stdin = stdin, 
309                 sudo = sudo)
310
311             if proc.poll() and err:
312                 failed = True
313         
314             if not failed:
315                 pid, ppid = self.node.wait_pid(home = self.home)
316                 if pid: self._pid = int(pid)
317                 if ppid: self._ppid = int(ppid)
318
319             if not self.pid or not self.ppid:
320                 failed = True
321  
322         (out, chkerr), proc = self.node.check_output(self.home, 'stderr')
323
324         if failed or out or chkerr:
325             # check if execution errors occurred
326             if err:
327                 err_msg = "%s. Proc error: %s" % (err_msg, err)
328
329             err_msg = "%s. Run error: %s " % (err_msg, out)
330
331             if chkerr:
332                 err_msg = "%s. Failed to check error: %s" % (err_msg, chkerr)
333
334             self.logger.error(err_msg)
335             self.state = ResourceState.FAILED
336
337     def stop(self):
338         state = self.state
339         if state == ResourceState.STARTED:
340             (out, err), proc = self.node.kill(self.pid, self.ppid)
341
342             if out or err:
343                 # check if execution errors occurred
344                 err_msg = " Failed to STOP command '%s' on host %s. Check error: %s. Run error: %s" % (
345                      self.get("command"), self.node.get("hostname"), err, out)
346                 self.logger.error(err_msg)
347                 self._state = ResourceState.FAILED
348                 stopped = False
349             else:
350                 super(LinuxApplication, self).stop()
351
352     def release(self):
353         tear_down = self.get("tearDown")
354         if tear_down:
355             self.node.execute(tear_down)
356
357         self.stop()
358         if self.state == ResourceState.STOPPED:
359             super(LinuxApplication, self).release()
360     
361     @property
362     def state(self):
363         if self._state == ResourceState.STARTED:
364             (out, err), proc = self.node.check_output(self.home, 'stderr')
365
366             if out or err:
367                 # check if execution errors occurred
368                 err_msg = " Failed to execute command '%s' on host %s. Check error: %s. Run error: %s" % (
369                         self.get("command"), self.node.get("hostname"), err, out)
370                 self.logger.error(err_msg)
371                 self._state = ResourceState.FAILED
372
373             elif self.pid and self.ppid:
374                 status = self.node.status(self.pid, self.ppid)
375
376                 if status == sshfuncs.FINISHED:
377                     self._state = ResourceState.FINISHED
378
379         return self._state
380
381     def valid_connection(self, guid):
382         # TODO: Validate!
383         return True
384         # XXX: What if it is connected to more than one node?
385         resources = self.find_resources(exact_tags = [tags.NODE])
386         self._node = resources[0] if len(resources) == 1 else None
387         return self._node
388
389     def hash_app(self):
390         """ Generates a hash representing univokely the application.
391         Is used to determine whether the home directory should be cleaned
392         or not.
393
394         """
395         command = self.get("command")
396         forwards_x11 = self.get("forwardX11")
397         env = self.get("env")
398         sudo = self.get("sudo")
399         depends = self.get("depends")
400         sources = self.get("sources")
401         cls._register_attribute(sources)
402         cls._register_attribute(build)
403         cls._register_attribute(install)
404         cls._register_attribute(stdin)
405         cls._register_attribute(stdout)
406         cls._register_attribute(stderr)
407         cls._register_attribute(tear_down)
408         skey = "".join(map(str, args))
409         return hashlib.md5(skey).hexdigest()
410
411     def replace_paths(self, command):
412         """
413         Replace all special path tags with shell-escaped actual paths.
414         """
415         return ( command
416             .replace("${SOURCES}", self.src_dir)
417             .replace("${BUILD}", self.build_dir) 
418             .replace("${APPHOME}", self.home) 
419             .replace("${NODEHOME}", self.node.home) )
420
421