Merge neco to nepi-3.0
[nepi.git] / src / nepi / resources / linux / application.py
1 from nepi.execution.attribute import Attribute, Flags, Types
2 from nepi.execution.trace import Trace, TraceAttr
3 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
4 from nepi.resources.linux.node import LinuxNode
5 from nepi.util import sshfuncs 
6 from nepi.util.timefuncs import strfnow, strfdiff
7
8 import logging
9 import os
10
11 reschedule_delay = "0.5s"
12 state_check_delay = 1
13
14 # TODO: Resolve wildcards in commands!! 
15
16 @clsinit
17 class LinuxApplication(ResourceManager):
18     _rtype = "LinuxApplication"
19
20     @classmethod
21     def _register_attributes(cls):
22         command = Attribute("command", "Command to execute", 
23                 flags = Flags.ExecReadOnly)
24         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
25                 flags = Flags.ExecReadOnly)
26         env = Attribute("env", "Environment variables string for command execution",
27                 flags = Flags.ExecReadOnly)
28         sudo = Attribute("sudo", "Run with root privileges", 
29                 flags = Flags.ExecReadOnly)
30         depends = Attribute("depends", 
31                 "Space-separated list of packages required to run the application",
32                 flags = Flags.ExecReadOnly)
33         sources = Attribute("sources", 
34                 "Space-separated list of regular files to be deployed in the working "
35                 "path prior to building. Archives won't be expanded automatically.",
36                 flags = Flags.ExecReadOnly)
37         code = Attribute("code", 
38                 "Plain text source code to be uploaded to the server. It will be stored "
39                 "under ${SOURCES}/code",
40                 flags = Flags.ExecReadOnly)
41         build = Attribute("build", 
42                 "Build commands to execute after deploying the sources. "
43                 "Sources will be in the ${SOURCES} folder. "
44                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
45                 "Try to make the commands return with a nonzero exit code on error.\n"
46                 "Also, do not install any programs here, use the 'install' attribute. This will "
47                 "help keep the built files constrained to the build folder (which may "
48                 "not be the home folder), and will result in faster deployment. Also, "
49                 "make sure to clean up temporary files, to reduce bandwidth usage between "
50                 "nodes when transferring built packages.",
51                 flags = Flags.ReadOnly)
52         install = Attribute("install", 
53                 "Commands to transfer built files to their final destinations. "
54                 "Sources will be in the initial working folder, and a special "
55                 "tag ${SOURCES} can be used to reference the experiment's "
56                 "home folder (where the application commands will run).\n"
57                 "ALL sources and targets needed for execution must be copied there, "
58                 "if building has been enabled.\n"
59                 "That is, 'slave' nodes will not automatically get any source files. "
60                 "'slave' nodes don't get build dependencies either, so if you need "
61                 "make and other tools to install, be sure to provide them as "
62                 "actual dependencies instead.",
63                 flags = Flags.ReadOnly)
64         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
65         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
66         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
67         tear_down = Attribute("tearDown", "Bash script to be executed before "
68                 "releasing the resource", 
69                 flags = Flags.ReadOnly)
70
71         cls._register_attribute(command)
72         cls._register_attribute(forward_x11)
73         cls._register_attribute(env)
74         cls._register_attribute(sudo)
75         cls._register_attribute(depends)
76         cls._register_attribute(sources)
77         cls._register_attribute(code)
78         cls._register_attribute(build)
79         cls._register_attribute(install)
80         cls._register_attribute(stdin)
81         cls._register_attribute(stdout)
82         cls._register_attribute(stderr)
83         cls._register_attribute(tear_down)
84
85     @classmethod
86     def _register_traces(cls):
87         stdout = Trace("stdout", "Standard output stream")
88         stderr = Trace("stderr", "Standard error stream")
89         buildlog = Trace("buildlog", "Output of the build process")
90
91         cls._register_trace(stdout)
92         cls._register_trace(stderr)
93         cls._register_trace(buildlog)
94
95     def __init__(self, ec, guid):
96         super(LinuxApplication, self).__init__(ec, guid)
97         self._pid = None
98         self._ppid = None
99         self._home = "app-%s" % self.guid
100
101         # timestamp of last state check of the application
102         self._last_state_check = strfnow()
103
104         self._logger = logging.getLogger("LinuxApplication")
105     
106     def log_message(self, msg):
107         return " guid %d - host %s - %s " % (self.guid, 
108                 self.node.get("hostname"), msg)
109
110     @property
111     def node(self):
112         node = self.get_connected(LinuxNode.rtype())
113         if node: return node[0]
114         return None
115
116     @property
117     def app_home(self):
118         return os.path.join(self.node.exp_home, self._home)
119
120     @property
121     def src_dir(self):
122         return os.path.join(self.app_home, 'src')
123
124     @property
125     def build_dir(self):
126         return os.path.join(self.app_home, 'build')
127
128     @property
129     def pid(self):
130         return self._pid
131
132     @property
133     def ppid(self):
134         return self._ppid
135
136     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
137         self.info("Retrieving '%s' trace %s " % (name, attr))
138
139         path = os.path.join(self.app_home, name)
140         
141         command = "(test -f %s && echo 'success') || echo 'error'" % path
142         (out, err), proc = self.node.execute(command)
143
144         if (err and proc.poll()) or out.find("error") != -1:
145             msg = " Couldn't find trace %s " % name
146             self.error(msg, out, err)
147             return None
148     
149         if attr == TraceAttr.PATH:
150             return path
151
152         if attr == TraceAttr.ALL:
153             (out, err), proc = self.node.check_output(self.app_home, name)
154             
155             if err and proc.poll():
156                 msg = " Couldn't read trace %s " % name
157                 self.error(msg, out, err)
158                 return None
159
160             return out
161
162         if attr == TraceAttr.STREAM:
163             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
164         elif attr == TraceAttr.SIZE:
165             cmd = "stat -c%%s %s " % path
166
167         (out, err), proc = self.node.execute(cmd)
168
169         if err and proc.poll():
170             msg = " Couldn't find trace %s " % name
171             self.error(msg, out, err)
172             return None
173         
174         if attr == TraceAttr.SIZE:
175             out = int(out.strip())
176
177         return out
178             
179     def provision(self, filters = None):
180         # create home dir for application
181         self.node.mkdir(self.app_home)
182
183         # upload sources
184         self.upload_sources()
185
186         # upload code
187         self.upload_code()
188
189         # upload stdin
190         self.upload_stdin()
191
192         # install dependencies
193         self.install_dependencies()
194
195         # build
196         self.build()
197
198         # Install
199         self.install()
200
201         command = self.get("command")
202         x11 = self.get("forwardX11")
203         if not x11 and command:
204             self.info("Uploading command '%s'" % command)
205
206             # Export environment
207             environ = ""
208             env = self.get("env") or ""
209             for var in env.split(" "):
210                 environ += 'export %s\n' % var
211
212             command = environ + command
213
214             # If the command runs asynchronous, pre upload the command 
215             # to the app.sh file in the remote host
216             dst = os.path.join(self.app_home, "app.sh")
217             command = self.replace_paths(command)
218             self.node.upload(command, dst, text = True)
219
220         super(LinuxApplication, self).provision()
221
222     def upload_sources(self):
223         # TODO: check if sources need to be uploaded and upload them
224         sources = self.get("sources")
225         if sources:
226             self.info(" Uploading sources ")
227
228             # create dir for sources
229             self.node.mkdir(self.src_dir)
230
231             sources = sources.split(' ')
232
233             http_sources = list()
234             for source in list(sources):
235                 if source.startswith("http") or source.startswith("https"):
236                     http_sources.append(source)
237                     sources.remove(source)
238
239             # Download http sources
240             if http_sources:
241                 cmd = " wget -c --directory-prefix=${SOURCES} "
242                 verif = ""
243
244                 for source in http_sources:
245                     cmd += " %s " % (source)
246                     verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
247                 
248                 # Wget output goes to stderr :S
249                 cmd += " 2> /dev/null ; "
250
251                 # Add verification
252                 cmd += " %s " % verif
253
254                 # Upload the command to a file, and execute asynchronously
255                 self.upload_and_run(cmd, 
256                         "http_sources.sh", "http_sources_pid", 
257                         "http_sources_out", "http_sources_err")
258             if sources:
259                 self.node.upload(sources, self.src_dir)
260
261     def upload_code(self):
262         code = self.get("code")
263         if code:
264             # create dir for sources
265             self.node.mkdir(self.src_dir)
266
267             self.info(" Uploading code ")
268
269             dst = os.path.join(self.src_dir, "code")
270             self.node.upload(sources, dst, text = True)
271
272     def upload_stdin(self):
273         stdin = self.get("stdin")
274         if stdin:
275             # create dir for sources
276             self.info(" Uploading stdin ")
277
278             dst = os.path.join(self.app_home, "stdin")
279             self.node.upload(stdin, dst, text = True)
280
281     def install_dependencies(self):
282         depends = self.get("depends")
283         if depends:
284             self.info(" Installing dependencies %s" % depends)
285             self.node.install_packages(depends, home = self.app_home)
286
287     def build(self):
288         build = self.get("build")
289         if build:
290             self.info(" Building sources ")
291             
292             # create dir for build
293             self.node.mkdir(self.build_dir)
294
295             # Upload the command to a file, and execute asynchronously
296             self.upload_and_run(build, 
297                     "build.sh", "build_pid", 
298                     "build_out", "build_err")
299  
300     def install(self):
301         install = self.get("install")
302         if install:
303             self.info(" Installing sources ")
304
305             # Upload the command to a file, and execute asynchronously
306             self.upload_and_run(install, 
307                     "install.sh", "install_pid", 
308                     "install_out", "install_err")
309
310     def deploy(self):
311         # Wait until node is associated and deployed
312         node = self.node
313         if not node or node.state < ResourceState.READY:
314             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
315             self.ec.schedule(reschedule_delay, self.deploy)
316         else:
317             try:
318                 command = self.get("command") or ""
319                 self.info(" Deploying command '%s' " % command)
320                 self.discover()
321                 self.provision()
322             except:
323                 self._state = ResourceState.FAILED
324                 raise
325
326             super(LinuxApplication, self).deploy()
327
328     def start(self):
329         command = self.get('command')
330         env = self.get('env')
331         stdin = 'stdin' if self.get('stdin') else None
332         stdout = 'stdout' if self.get('stdout') else 'stdout'
333         stderr = 'stderr' if self.get('stderr') else 'stderr'
334         sudo = self.get('sudo') or False
335         x11 = self.get('forwardX11') or False
336         failed = False
337
338         super(LinuxApplication, self).start()
339
340         if not command:
341             self.info("No command to start ")
342             self._state = ResourceState.FINISHED
343             return 
344     
345         self.info("Starting command '%s'" % command)
346
347         if x11:
348             if env:
349                 # Export environment
350                 environ = ""
351                 for var in env.split(" "):
352                     environ += ' %s ' % var
353
354                 command = "(" + environ + " ; " + command + ")"
355                 command = self.replace_paths(command)
356
357             # If the command requires X11 forwarding, we
358             # can't run it asynchronously
359             (out, err), proc = self.node.execute(command,
360                     sudo = sudo,
361                     stdin = stdin,
362                     forward_x11 = x11)
363
364             self._state = ResourceState.FINISHED
365
366             if proc.poll() and err:
367                 failed = True
368         else:
369             # Command was  previously uploaded, now run the remote
370             # bash file asynchronously
371             cmd = "bash ./app.sh"
372             (out, err), proc = self.node.run(cmd, self.app_home, 
373                 stdin = stdin, 
374                 stdout = stdout,
375                 stderr = stderr,
376                 sudo = sudo)
377
378             if proc.poll() and err:
379                 failed = True
380         
381             if not failed:
382                 pid, ppid = self.node.wait_pid(home = self.app_home)
383                 if pid: self._pid = int(pid)
384                 if ppid: self._ppid = int(ppid)
385
386             if not self.pid or not self.ppid:
387                 failed = True
388  
389             (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
390
391             if failed or out or chkerr:
392                 # check if execution errors occurred
393                 msg = " Failed to start command '%s' " % command
394                 out = out
395                 if err:
396                     err = err
397                 elif chkerr:
398                     err = chkerr
399
400                 self.error(msg, out, err)
401
402                 msg2 = " Setting state to Failed"
403                 self.debug(msg2)
404                 self._state = ResourceState.FAILED
405
406                 raise RuntimeError, msg
407
408     def stop(self):
409         command = self.get('command') or ''
410         state = self.state
411         
412         if state == ResourceState.STARTED:
413             self.info("Stopping command '%s'" % command)
414
415             (out, err), proc = self.node.kill(self.pid, self.ppid)
416
417             if out or err:
418                 # check if execution errors occurred
419                 msg = " Failed to STOP command '%s' " % self.get("command")
420                 self.error(msg, out, err)
421                 self._state = ResourceState.FAILED
422                 stopped = False
423             else:
424                 super(LinuxApplication, self).stop()
425
426     def release(self):
427         self.info("Releasing resource")
428
429         tear_down = self.get("tearDown")
430         if tear_down:
431             self.node.execute(tear_down)
432
433         self.stop()
434         if self.state == ResourceState.STOPPED:
435             super(LinuxApplication, self).release()
436     
437     @property
438     def state(self):
439         if self._state == ResourceState.STARTED:
440             # To avoid overwhelming the remote hosts and the local processor
441             # with too many ssh queries, the state is only requested
442             # every 'state_check_delay' .
443             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
444                 # check if execution errors occurred
445                 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
446
447                 if out or err:
448                     if err.find("No such file or directory") >= 0 :
449                         # The resource is marked as started, but the
450                         # command was not yet executed
451                         return ResourceState.READY
452
453                     msg = " Failed to execute command '%s'" % self.get("command")
454                     self.error(msg, out, err)
455                     self._state = ResourceState.FAILED
456
457                 elif self.pid and self.ppid:
458                     status = self.node.status(self.pid, self.ppid)
459
460                     if status == sshfuncs.FINISHED:
461                         self._state = ResourceState.FINISHED
462
463
464                 self._last_state_check = strfnow()
465
466         return self._state
467
468     def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
469         dst = os.path.join(self.app_home, fname)
470         cmd = self.replace_paths(cmd)
471         self.node.upload(cmd, dst, text = True)
472
473         cmd = "bash ./%s" % fname
474         (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
475             pidfile = pidfile,
476             stdout = outfile, 
477             stderr = errfile, 
478             raise_on_error = True)
479
480     def replace_paths(self, command):
481         """
482         Replace all special path tags with shell-escaped actual paths.
483         """
484         def absolute_dir(d):
485             return d if d.startswith("/") else os.path.join("${HOME}", d)
486
487         return ( command
488             .replace("${SOURCES}", absolute_dir(self.src_dir))
489             .replace("${BUILD}", absolute_dir(self.build_dir))
490             .replace("${APP_HOME}", absolute_dir(self.app_home))
491             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
492             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
493             )
494         
495     def valid_connection(self, guid):
496         # TODO: Validate!
497         return True
498         # XXX: What if it is connected to more than one node?
499         resources = self.find_resources(exact_tags = [tags.NODE])
500         self._node = resources[0] if len(resources) == 1 else None
501         return self._node
502