First CCN RMs working example for Linux
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import strfnow, strfdiff
26
27 import os
28
29 # TODO: Resolve wildcards in commands!!
30
31
32 @clsinit
33 class LinuxApplication(ResourceManager):
34     _rtype = "LinuxApplication"
35
36     @classmethod
37     def _register_attributes(cls):
38         command = Attribute("command", "Command to execute", 
39                 flags = Flags.ExecReadOnly)
40         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
41                 flags = Flags.ExecReadOnly)
42         env = Attribute("env", "Environment variables string for command execution",
43                 flags = Flags.ExecReadOnly)
44         sudo = Attribute("sudo", "Run with root privileges", 
45                 flags = Flags.ExecReadOnly)
46         depends = Attribute("depends", 
47                 "Space-separated list of packages required to run the application",
48                 flags = Flags.ExecReadOnly)
49         sources = Attribute("sources", 
50                 "Space-separated list of regular files to be deployed in the working "
51                 "path prior to building. Archives won't be expanded automatically.",
52                 flags = Flags.ExecReadOnly)
53         code = Attribute("code", 
54                 "Plain text source code to be uploaded to the server. It will be stored "
55                 "under ${SOURCES}/code",
56                 flags = Flags.ExecReadOnly)
57         build = Attribute("build", 
58                 "Build commands to execute after deploying the sources. "
59                 "Sources will be in the ${SOURCES} folder. "
60                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
61                 "Try to make the commands return with a nonzero exit code on error.\n"
62                 "Also, do not install any programs here, use the 'install' attribute. This will "
63                 "help keep the built files constrained to the build folder (which may "
64                 "not be the home folder), and will result in faster deployment. Also, "
65                 "make sure to clean up temporary files, to reduce bandwidth usage between "
66                 "nodes when transferring built packages.",
67                 flags = Flags.ReadOnly)
68         install = Attribute("install", 
69                 "Commands to transfer built files to their final destinations. "
70                 "Sources will be in the initial working folder, and a special "
71                 "tag ${SOURCES} can be used to reference the experiment's "
72                 "home folder (where the application commands will run).\n"
73                 "ALL sources and targets needed for execution must be copied there, "
74                 "if building has been enabled.\n"
75                 "That is, 'slave' nodes will not automatically get any source files. "
76                 "'slave' nodes don't get build dependencies either, so if you need "
77                 "make and other tools to install, be sure to provide them as "
78                 "actual dependencies instead.",
79                 flags = Flags.ReadOnly)
80         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
81         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
82         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
83         tear_down = Attribute("tearDown", "Bash script to be executed before "
84                 "releasing the resource", 
85                 flags = Flags.ReadOnly)
86
87         cls._register_attribute(command)
88         cls._register_attribute(forward_x11)
89         cls._register_attribute(env)
90         cls._register_attribute(sudo)
91         cls._register_attribute(depends)
92         cls._register_attribute(sources)
93         cls._register_attribute(code)
94         cls._register_attribute(build)
95         cls._register_attribute(install)
96         cls._register_attribute(stdin)
97         cls._register_attribute(stdout)
98         cls._register_attribute(stderr)
99         cls._register_attribute(tear_down)
100
101     @classmethod
102     def _register_traces(cls):
103         stdout = Trace("stdout", "Standard output stream")
104         stderr = Trace("stderr", "Standard error stream")
105
106         cls._register_trace(stdout)
107         cls._register_trace(stderr)
108
109     def __init__(self, ec, guid):
110         super(LinuxApplication, self).__init__(ec, guid)
111         self._pid = None
112         self._ppid = None
113         self._home = "app-%s" % self.guid
114
115         # keep a reference to the running process handler when 
116         # the command is not executed as remote daemon in background
117         self._proc = None
118
119         # timestamp of last state check of the application
120         self._last_state_check = strfnow()
121     
122     def log_message(self, msg):
123         return " guid %d - host %s - %s " % (self.guid, 
124                 self.node.get("hostname"), msg)
125
126     @property
127     def node(self):
128         node = self.get_connected(LinuxNode.rtype())
129         if node: return node[0]
130         return None
131
132     @property
133     def app_home(self):
134         return os.path.join(self.node.exp_home, self._home)
135
136     @property
137     def src_dir(self):
138         return os.path.join(self.app_home, 'src')
139
140     @property
141     def build_dir(self):
142         return os.path.join(self.app_home, 'build')
143
144     @property
145     def pid(self):
146         return self._pid
147
148     @property
149     def ppid(self):
150         return self._ppid
151
152     @property
153     def in_foreground(self):
154         """ Returns True if the command needs to be executed in foreground.
155         This means that command will be executed using 'execute' instead of
156         'run' ('run' executes a command in background and detached from the 
157         terminal)
158
159         When using X11 forwarding option, the command can not run in background
160         and detached from a terminal, since we need to keep the terminal attached 
161         to interact with it.
162         """
163         return self.get("forwardX11") or False
164
165     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
166         self.info("Retrieving '%s' trace %s " % (name, attr))
167
168         path = os.path.join(self.app_home, name)
169         
170         command = "(test -f %s && echo 'success') || echo 'error'" % path
171         (out, err), proc = self.node.execute(command)
172
173         if (err and proc.poll()) or out.find("error") != -1:
174             msg = " Couldn't find trace %s " % name
175             self.error(msg, out, err)
176             return None
177     
178         if attr == TraceAttr.PATH:
179             return path
180
181         if attr == TraceAttr.ALL:
182             (out, err), proc = self.node.check_output(self.app_home, name)
183             
184             if err and proc.poll():
185                 msg = " Couldn't read trace %s " % name
186                 self.error(msg, out, err)
187                 return None
188
189             return out
190
191         if attr == TraceAttr.STREAM:
192             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
193         elif attr == TraceAttr.SIZE:
194             cmd = "stat -c%%s %s " % path
195
196         (out, err), proc = self.node.execute(cmd)
197
198         if err and proc.poll():
199             msg = " Couldn't find trace %s " % name
200             self.error(msg, out, err)
201             return None
202         
203         if attr == TraceAttr.SIZE:
204             out = int(out.strip())
205
206         return out
207             
208     def provision(self):
209         # create home dir for application
210         self.node.mkdir(self.app_home)
211
212         # upload sources
213         self.upload_sources()
214
215         # upload code
216         self.upload_code()
217
218         # upload stdin
219         self.upload_stdin()
220
221         # install dependencies
222         self.install_dependencies()
223
224         # build
225         self.build()
226
227         # Install
228         self.install()
229
230         # Upload command to remote bash script
231         # - only if command can be executed in background and detached
232         command = self.get("command")
233
234         if command and not self.in_foreground:
235             self.info("Uploading command '%s'" % command)
236
237             # replace application specific paths in the command
238             command = self.replace_paths(command)
239             
240             # replace application specific paths in the environment
241             env = self.get("env")
242             env = env and self.replace_paths(env)
243
244             self.node.upload_command(command, self.app_home, 
245                     shfile = "app.sh",
246                     env = env)
247        
248         self.info("Provisioning finished")
249
250         super(LinuxApplication, self).provision()
251
252     def upload_sources(self):
253         sources = self.get("sources")
254         if sources:
255             self.info("Uploading sources ")
256
257             # create dir for sources
258             self.node.mkdir(self.src_dir)
259
260             sources = sources.split(' ')
261
262             http_sources = list()
263             for source in list(sources):
264                 if source.startswith("http") or source.startswith("https"):
265                     http_sources.append(source)
266                     sources.remove(source)
267
268             # Download http sources remotely
269             if http_sources:
270                 command = [" wget -c --directory-prefix=${SOURCES} "]
271                 check = []
272
273                 for source in http_sources:
274                     command.append(" %s " % (source))
275                     check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
276                 
277                 command = " ".join(command)
278                 check = " ; ".join(check)
279
280                 # Append the command to check that the sources were downloaded
281                 command += " ; %s " % check
282
283                 # replace application specific paths in the command
284                 command = self.replace_paths(command)
285                 
286                 # Upload the command to a bash script and run it
287                 # in background ( but wait until the command has
288                 # finished to continue )
289                 self.node.run_and_wait(command, self.app_home,
290                         shfile = "http_sources.sh",
291                         pidfile = "http_sources_pidfile", 
292                         ecodefile = "http_sources_exitcode", 
293                         stdout = "http_sources_stdout", 
294                         stderr = "http_sources_stderr")
295
296             if sources:
297                 self.node.upload(sources, self.src_dir)
298
299     def upload_code(self):
300         code = self.get("code")
301         if code:
302             # create dir for sources
303             self.node.mkdir(self.src_dir)
304
305             self.info("Uploading code ")
306
307             dst = os.path.join(self.src_dir, "code")
308             self.node.upload(sources, dst, text = True)
309
310     def upload_stdin(self):
311         stdin = self.get("stdin")
312         if stdin:
313             # create dir for sources
314             self.info(" Uploading stdin ")
315
316             dst = os.path.join(self.app_home, "stdin")
317             self.node.upload(stdin, dst, text = True)
318
319     def install_dependencies(self):
320         depends = self.get("depends")
321         if depends:
322             self.info("Installing dependencies %s" % depends)
323             self.node.install_packages(depends, self.app_home)
324
325     def build(self):
326         build = self.get("build")
327         if build:
328             self.info("Building sources ")
329             
330             # create dir for build
331             self.node.mkdir(self.build_dir)
332
333             # replace application specific paths in the command
334             command = self.replace_paths(build)
335
336             # Upload the command to a bash script and run it
337             # in background ( but wait until the command has
338             # finished to continue )
339             self.node.run_and_wait(command, self.app_home,
340                     shfile = "build.sh",
341                     pidfile = "build_pidfile", 
342                     ecodefile = "build_exitcode", 
343                     stdout = "build_stdout", 
344                     stderr = "build_stderr")
345  
346     def install(self):
347         install = self.get("install")
348         if install:
349             self.info("Installing sources ")
350
351             # replace application specific paths in the command
352             command = self.replace_paths(install)
353
354             # Upload the command to a bash script and run it
355             # in background ( but wait until the command has
356             # finished to continue )
357             self.node.run_and_wait(command, self.app_home,
358                     shfile = "install.sh",
359                     pidfile = "install_pidfile", 
360                     ecodefile = "install_exitcode", 
361                     stdout = "install_stdout", 
362                     stderr = "install_stderr")
363
364     def deploy(self):
365         # Wait until node is associated and deployed
366         node = self.node
367         if not node or node.state < ResourceState.READY:
368             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
369             
370             reschedule_delay = "0.5s"
371             self.ec.schedule(reschedule_delay, self.deploy)
372         else:
373             try:
374                 command = self.get("command") or ""
375                 self.info("Deploying command '%s' " % command)
376                 self.discover()
377                 self.provision()
378             except:
379                 self._state = ResourceState.FAILED
380                 raise
381
382             super(LinuxApplication, self).deploy()
383
384     def start(self):
385         command = self.get("command")
386
387         self.info("Starting command '%s'" % command)
388
389         if not command:
390             # If no command was given (i.e. Application was used for dependency
391             # installation), then the application is directly marked as FINISHED
392             self._state = ResourceState.FINISHED
393         else:
394             if self.in_foreground:
395                 self._start_in_foreground()
396             else:
397                 self._start_in_background()
398
399             super(LinuxApplication, self).start()
400
401     def _start_in_foreground(self):
402         command = self.get("command")
403         env = self.get("env")
404         stdin = "stdin" if self.get("stdin") else None
405         sudo = self.get("sudo") or False
406         x11 = self.get("forwardX11")
407
408         # Command will be launched in foreground and attached to the
409         # terminal using the node 'execute' in non blocking mode.
410
411         # Export environment
412         environ = self.node.format_environment(env, inline = True)
413         command = environ + command
414         command = self.replace_paths(command)
415
416         # We save the reference to the process in self._proc 
417         # to be able to kill the process from the stop method.
418         # We also set blocking = False, since we don't want the
419         # thread to block until the execution finishes.
420         (out, err), self._proc = self.node.execute(command,
421                 sudo = sudo,
422                 stdin = stdin,
423                 forward_x11 = x11,
424                 blocking = False)
425
426         if self._proc.poll():
427             self._state = ResourceState.FAILED
428             self.error(msg, out, err)
429             raise RuntimeError, msg
430
431     def _start_in_background(self):
432         command = self.get("command")
433         env = self.get("env")
434         stdin = "stdin" if self.get("stdin") else None
435         stdout = "stdout" if self.get("stdout") else "stdout"
436         stderr = "stderr" if self.get("stderr") else "stderr"
437         sudo = self.get("sudo") or False
438
439         # Command will be as a daemon in baground and detached from any terminal.
440         # The real command to run was previously uploaded to a bash script
441         # during deployment, now launch the remote script using 'run'
442         # method from the node
443         cmd = "bash ./app.sh"
444         (out, err), proc = self.node.run(cmd, self.app_home, 
445             stdin = stdin, 
446             stdout = stdout,
447             stderr = stderr,
448             sudo = sudo)
449
450         # check if execution errors occurred
451         msg = " Failed to start command '%s' " % command
452         
453         if proc.poll():
454             self._state = ResourceState.FAILED
455             self.error(msg, out, err)
456             raise RuntimeError, msg
457     
458         # Wait for pid file to be generated
459         pid, ppid = self.node.wait_pid(self.app_home)
460         if pid: self._pid = int(pid)
461         if ppid: self._ppid = int(ppid)
462
463         # If the process is not running, check for error information
464         # on the remote machine
465         if not self.pid or not self.ppid:
466             (out, err), proc = self.check_errors(home, ecodefile, stderr)
467
468             # Out is what was written in the stderr file
469             if err:
470                 self._state = ResourceState.FAILED
471                 msg = " Failed to start command '%s' " % command
472                 self.error(msg, out, err)
473                 raise RuntimeError, msg
474         
475     def stop(self):
476         """ Stops application execution
477         """
478         command = self.get('command') or ''
479
480         if self.state == ResourceState.STARTED:
481             stopped = True
482
483             self.info("Stopping command '%s'" % command)
484         
485             # If the command is running in foreground (it was launched using
486             # the node 'execute' method), then we use the handler to the Popen
487             # process to kill it. Else we send a kill signal using the pid and ppid
488             # retrieved after running the command with the node 'run' method
489
490             if self._proc:
491                 self._proc.kill()
492             else:
493                 (out, err), proc = self.node.kill(self.pid, self.ppid)
494
495                 if out or err:
496                     # check if execution errors occurred
497                     msg = " Failed to STOP command '%s' " % self.get("command")
498                     self.error(msg, out, err)
499                     self._state = ResourceState.FAILED
500                     stopped = False
501
502             if stopped:
503                 super(LinuxApplication, self).stop()
504
505     def release(self):
506         self.info("Releasing resource")
507
508         tear_down = self.get("tearDown")
509         if tear_down:
510             self.node.execute(tear_down)
511
512         self.stop()
513
514         if self.state == ResourceState.STOPPED:
515             super(LinuxApplication, self).release()
516     
517     @property
518     def state(self):
519         """ Returns the state of the application
520         """
521         if self._state == ResourceState.STARTED:
522             if self.in_foreground:
523                 # Check if the process we used to execute the command
524                 # is still running ...
525                 retcode = self._proc.poll()
526                 
527                 # retcode == None -> running
528                 # retcode > 0 -> error
529                 # retcode == 0 -> finished
530                 if retcode:
531                     out = ""
532                     msg = " Failed to execute command '%s'" % self.get("command")
533                     err = self._proc.stderr.read()
534                     self.error(msg, out, err)
535                     self._state = ResourceState.FAILED
536                 elif retcode == 0:
537                     self._state = ResourceState.FINISHED
538
539             else:
540                 # We need to query the status of the command we launched in 
541                 # background. In oredr to avoid overwhelming the remote host and
542                 # the local processor with too many ssh queries, the state is only
543                 # requested every 'state_check_delay' seconds.
544                 state_check_delay = 0.5
545                 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
546                     # check if execution errors occurred
547                     (out, err), proc = self.node.check_errors(self.app_home)
548
549                     if err:
550                         msg = " Failed to execute command '%s'" % self.get("command")
551                         self.error(msg, out, err)
552                         self._state = ResourceState.FAILED
553
554                     elif self.pid and self.ppid:
555                         # No execution errors occurred. Make sure the background
556                         # process with the recorded pid is still running.
557                         status = self.node.status(self.pid, self.ppid)
558
559                         if status == ProcStatus.FINISHED:
560                             self._state = ResourceState.FINISHED
561
562                     self._last_state_check = strfnow()
563
564         return self._state
565
566     def replace_paths(self, command):
567         """
568         Replace all special path tags with shell-escaped actual paths.
569         """
570         def absolute_dir(d):
571             return d if d.startswith("/") else os.path.join("${HOME}", d)
572
573         return ( command
574             .replace("${SOURCES}", absolute_dir(self.src_dir))
575             .replace("${BUILD}", absolute_dir(self.build_dir))
576             .replace("${APP_HOME}", absolute_dir(self.app_home))
577             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
578             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
579             )
580         
581     def valid_connection(self, guid):
582         # TODO: Validate!
583         return True
584