More examples and code for Linux CCN RMs
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import strfnow, strfdiff
26
27 import os
28
29 # TODO: Resolve wildcards in commands!!
30
31
32 @clsinit
33 class LinuxApplication(ResourceManager):
34     _rtype = "LinuxApplication"
35
36     @classmethod
37     def _register_attributes(cls):
38         command = Attribute("command", "Command to execute", 
39                 flags = Flags.ExecReadOnly)
40         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
41                 flags = Flags.ExecReadOnly)
42         env = Attribute("env", "Environment variables string for command execution",
43                 flags = Flags.ExecReadOnly)
44         sudo = Attribute("sudo", "Run with root privileges", 
45                 flags = Flags.ExecReadOnly)
46         depends = Attribute("depends", 
47                 "Space-separated list of packages required to run the application",
48                 flags = Flags.ExecReadOnly)
49         sources = Attribute("sources", 
50                 "Space-separated list of regular files to be deployed in the working "
51                 "path prior to building. Archives won't be expanded automatically.",
52                 flags = Flags.ExecReadOnly)
53         code = Attribute("code", 
54                 "Plain text source code to be uploaded to the server. It will be stored "
55                 "under ${SOURCES}/code",
56                 flags = Flags.ExecReadOnly)
57         build = Attribute("build", 
58                 "Build commands to execute after deploying the sources. "
59                 "Sources will be in the ${SOURCES} folder. "
60                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
61                 "Try to make the commands return with a nonzero exit code on error.\n"
62                 "Also, do not install any programs here, use the 'install' attribute. This will "
63                 "help keep the built files constrained to the build folder (which may "
64                 "not be the home folder), and will result in faster deployment. Also, "
65                 "make sure to clean up temporary files, to reduce bandwidth usage between "
66                 "nodes when transferring built packages.",
67                 flags = Flags.ReadOnly)
68         install = Attribute("install", 
69                 "Commands to transfer built files to their final destinations. "
70                 "Sources will be in the initial working folder, and a special "
71                 "tag ${SOURCES} can be used to reference the experiment's "
72                 "home folder (where the application commands will run).\n"
73                 "ALL sources and targets needed for execution must be copied there, "
74                 "if building has been enabled.\n"
75                 "That is, 'slave' nodes will not automatically get any source files. "
76                 "'slave' nodes don't get build dependencies either, so if you need "
77                 "make and other tools to install, be sure to provide them as "
78                 "actual dependencies instead.",
79                 flags = Flags.ReadOnly)
80         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
81         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
82         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
83         tear_down = Attribute("tearDown", "Bash script to be executed before "
84                 "releasing the resource", 
85                 flags = Flags.ReadOnly)
86
87         cls._register_attribute(command)
88         cls._register_attribute(forward_x11)
89         cls._register_attribute(env)
90         cls._register_attribute(sudo)
91         cls._register_attribute(depends)
92         cls._register_attribute(sources)
93         cls._register_attribute(code)
94         cls._register_attribute(build)
95         cls._register_attribute(install)
96         cls._register_attribute(stdin)
97         cls._register_attribute(stdout)
98         cls._register_attribute(stderr)
99         cls._register_attribute(tear_down)
100
101     @classmethod
102     def _register_traces(cls):
103         stdout = Trace("stdout", "Standard output stream")
104         stderr = Trace("stderr", "Standard error stream")
105
106         cls._register_trace(stdout)
107         cls._register_trace(stderr)
108
109     def __init__(self, ec, guid):
110         super(LinuxApplication, self).__init__(ec, guid)
111         self._pid = None
112         self._ppid = None
113         self._home = "app-%s" % self.guid
114         self._in_foreground = False
115
116         # keep a reference to the running process handler when 
117         # the command is not executed as remote daemon in background
118         self._proc = None
119
120         # timestamp of last state check of the application
121         self._last_state_check = strfnow()
122     
123     def log_message(self, msg):
124         return " guid %d - host %s - %s " % (self.guid, 
125                 self.node.get("hostname"), msg)
126
127     @property
128     def node(self):
129         node = self.get_connected(LinuxNode.rtype())
130         if node: return node[0]
131         return None
132
133     @property
134     def app_home(self):
135         return os.path.join(self.node.exp_home, self._home)
136
137     @property
138     def src_dir(self):
139         return os.path.join(self.app_home, 'src')
140
141     @property
142     def build_dir(self):
143         return os.path.join(self.app_home, 'build')
144
145     @property
146     def pid(self):
147         return self._pid
148
149     @property
150     def ppid(self):
151         return self._ppid
152
153     @property
154     def in_foreground(self):
155         """ Returns True if the command needs to be executed in foreground.
156         This means that command will be executed using 'execute' instead of
157         'run' ('run' executes a command in background and detached from the 
158         terminal)
159         
160         When using X11 forwarding option, the command can not run in background
161         and detached from a terminal, since we need to keep the terminal attached 
162         to interact with it.
163         """
164         return self.get("forwardX11") or self._in_foreground
165
166     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
167         self.info("Retrieving '%s' trace %s " % (name, attr))
168
169         path = os.path.join(self.app_home, name)
170         
171         command = "(test -f %s && echo 'success') || echo 'error'" % path
172         (out, err), proc = self.node.execute(command)
173
174         if (err and proc.poll()) or out.find("error") != -1:
175             msg = " Couldn't find trace %s " % name
176             self.error(msg, out, err)
177             return None
178     
179         if attr == TraceAttr.PATH:
180             return path
181
182         if attr == TraceAttr.ALL:
183             (out, err), proc = self.node.check_output(self.app_home, name)
184             
185             if err and proc.poll():
186                 msg = " Couldn't read trace %s " % name
187                 self.error(msg, out, err)
188                 return None
189
190             return out
191
192         if attr == TraceAttr.STREAM:
193             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
194         elif attr == TraceAttr.SIZE:
195             cmd = "stat -c%%s %s " % path
196
197         (out, err), proc = self.node.execute(cmd)
198
199         if err and proc.poll():
200             msg = " Couldn't find trace %s " % name
201             self.error(msg, out, err)
202             return None
203         
204         if attr == TraceAttr.SIZE:
205             out = int(out.strip())
206
207         return out
208             
209     def provision(self):
210         # create home dir for application
211         self.node.mkdir(self.app_home)
212
213         # upload sources
214         self.upload_sources()
215
216         # upload code
217         self.upload_code()
218
219         # upload stdin
220         self.upload_stdin()
221
222         # install dependencies
223         self.install_dependencies()
224
225         # build
226         self.build()
227
228         # Install
229         self.install()
230
231         # Upload command to remote bash script
232         # - only if command can be executed in background and detached
233         command = self.get("command")
234
235         if command and not self.in_foreground:
236             self.info("Uploading command '%s'" % command)
237
238             # replace application specific paths in the command
239             command = self.replace_paths(command)
240             
241             # replace application specific paths in the environment
242             env = self.get("env")
243             env = env and self.replace_paths(env)
244
245             self.node.upload_command(command, self.app_home, 
246                     shfile = "app.sh",
247                     env = env)
248        
249         self.info("Provisioning finished")
250
251         super(LinuxApplication, self).provision()
252
253     def upload_sources(self):
254         sources = self.get("sources")
255         if sources:
256             self.info("Uploading sources ")
257
258             # create dir for sources
259             self.node.mkdir(self.src_dir)
260
261             sources = sources.split(' ')
262
263             http_sources = list()
264             for source in list(sources):
265                 if source.startswith("http") or source.startswith("https"):
266                     http_sources.append(source)
267                     sources.remove(source)
268
269             # Download http sources remotely
270             if http_sources:
271                 command = [" wget -c --directory-prefix=${SOURCES} "]
272                 check = []
273
274                 for source in http_sources:
275                     command.append(" %s " % (source))
276                     check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
277                 
278                 command = " ".join(command)
279                 check = " ; ".join(check)
280
281                 # Append the command to check that the sources were downloaded
282                 command += " ; %s " % check
283
284                 # replace application specific paths in the command
285                 command = self.replace_paths(command)
286                 
287                 # Upload the command to a bash script and run it
288                 # in background ( but wait until the command has
289                 # finished to continue )
290                 self.node.run_and_wait(command, self.app_home,
291                         shfile = "http_sources.sh",
292                         pidfile = "http_sources_pidfile", 
293                         ecodefile = "http_sources_exitcode", 
294                         stdout = "http_sources_stdout", 
295                         stderr = "http_sources_stderr")
296
297             if sources:
298                 self.node.upload(sources, self.src_dir)
299
300     def upload_code(self):
301         code = self.get("code")
302         if code:
303             # create dir for sources
304             self.node.mkdir(self.src_dir)
305
306             self.info("Uploading code ")
307
308             dst = os.path.join(self.src_dir, "code")
309             self.node.upload(sources, dst, text = True)
310
311     def upload_stdin(self):
312         stdin = self.get("stdin")
313         if stdin:
314             # create dir for sources
315             self.info(" Uploading stdin ")
316             
317             dst = os.path.join(self.app_home, "stdin")
318
319             # TODO:
320             # Check wether file already exists and if it exists 
321             # wether the file we want to upload is the same
322             # (using md5sum)
323
324             self.node.upload(stdin, dst, text = True)
325
326     def install_dependencies(self):
327         depends = self.get("depends")
328         if depends:
329             self.info("Installing dependencies %s" % depends)
330             self.node.install_packages(depends, self.app_home)
331
332     def build(self):
333         build = self.get("build")
334         if build:
335             self.info("Building sources ")
336             
337             # create dir for build
338             self.node.mkdir(self.build_dir)
339
340             # replace application specific paths in the command
341             command = self.replace_paths(build)
342
343             # Upload the command to a bash script and run it
344             # in background ( but wait until the command has
345             # finished to continue )
346             self.node.run_and_wait(command, self.app_home,
347                     shfile = "build.sh",
348                     pidfile = "build_pidfile", 
349                     ecodefile = "build_exitcode", 
350                     stdout = "build_stdout", 
351                     stderr = "build_stderr")
352  
353     def install(self):
354         install = self.get("install")
355         if install:
356             self.info("Installing sources ")
357
358             # replace application specific paths in the command
359             command = self.replace_paths(install)
360
361             # Upload the command to a bash script and run it
362             # in background ( but wait until the command has
363             # finished to continue )
364             self.node.run_and_wait(command, self.app_home,
365                     shfile = "install.sh",
366                     pidfile = "install_pidfile", 
367                     ecodefile = "install_exitcode", 
368                     stdout = "install_stdout", 
369                     stderr = "install_stderr")
370
371     def deploy(self):
372         # Wait until node is associated and deployed
373         node = self.node
374         if not node or node.state < ResourceState.READY:
375             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
376             
377             reschedule_delay = "0.5s"
378             self.ec.schedule(reschedule_delay, self.deploy)
379         else:
380             try:
381                 command = self.get("command") or ""
382                 self.info("Deploying command '%s' " % command)
383                 self.discover()
384                 self.provision()
385             except:
386                 self._state = ResourceState.FAILED
387                 raise
388
389             super(LinuxApplication, self).deploy()
390
391     def start(self):
392         command = self.get("command")
393
394         self.info("Starting command '%s'" % command)
395
396         if not command:
397             # If no command was given (i.e. Application was used for dependency
398             # installation), then the application is directly marked as FINISHED
399             self._state = ResourceState.FINISHED
400         else:
401
402             if self.in_foreground:
403                 self._start_in_foreground()
404             else:
405                 self._start_in_background()
406
407             super(LinuxApplication, self).start()
408
409     def _start_in_foreground(self):
410         command = self.get("command")
411         stdin = "stdin" if self.get("stdin") else None
412         sudo = self.get("sudo") or False
413         x11 = self.get("forwardX11")
414
415         # Command will be launched in foreground and attached to the
416         # terminal using the node 'execute' in non blocking mode.
417
418         # Export environment
419         env = self.get("env")
420         environ = self.node.format_environment(env, inline = True)
421         command = environ + command
422         command = self.replace_paths(command)
423
424         # We save the reference to the process in self._proc 
425         # to be able to kill the process from the stop method.
426         # We also set blocking = False, since we don't want the
427         # thread to block until the execution finishes.
428         (out, err), self._proc = self.node.execute(command,
429                 sudo = sudo,
430                 stdin = stdin,
431                 forward_x11 = x11,
432                 blocking = False)
433
434         if self._proc.poll():
435             self._state = ResourceState.FAILED
436             self.error(msg, out, err)
437             raise RuntimeError, msg
438
439     def _start_in_background(self):
440         command = self.get("command")
441         env = self.get("env")
442         stdin = "stdin" if self.get("stdin") else None
443         stdout = "stdout" if self.get("stdout") else "stdout"
444         stderr = "stderr" if self.get("stderr") else "stderr"
445         sudo = self.get("sudo") or False
446
447         # Command will be as a daemon in baground and detached from any terminal.
448         # The real command to run was previously uploaded to a bash script
449         # during deployment, now launch the remote script using 'run'
450         # method from the node
451         cmd = "bash ./app.sh"
452         (out, err), proc = self.node.run(cmd, self.app_home, 
453             stdin = stdin, 
454             stdout = stdout,
455             stderr = stderr,
456             sudo = sudo)
457
458         # check if execution errors occurred
459         msg = " Failed to start command '%s' " % command
460         
461         if proc.poll():
462             self._state = ResourceState.FAILED
463             self.error(msg, out, err)
464             raise RuntimeError, msg
465     
466         # Wait for pid file to be generated
467         pid, ppid = self.node.wait_pid(self.app_home)
468         if pid: self._pid = int(pid)
469         if ppid: self._ppid = int(ppid)
470
471         # If the process is not running, check for error information
472         # on the remote machine
473         if not self.pid or not self.ppid:
474             (out, err), proc = self.node.check_errors(self.app_home,
475                     stderr = stderr) 
476
477             # Out is what was written in the stderr file
478             if err:
479                 self._state = ResourceState.FAILED
480                 msg = " Failed to start command '%s' " % command
481                 self.error(msg, out, err)
482                 raise RuntimeError, msg
483         
484     def stop(self):
485         """ Stops application execution
486         """
487         command = self.get('command') or ''
488
489         if self.state == ResourceState.STARTED:
490             stopped = True
491
492             self.info("Stopping command '%s'" % command)
493         
494             # If the command is running in foreground (it was launched using
495             # the node 'execute' method), then we use the handler to the Popen
496             # process to kill it. Else we send a kill signal using the pid and ppid
497             # retrieved after running the command with the node 'run' method
498
499             if self._proc:
500                 self._proc.kill()
501             else:
502                 # Only try to kill the process if the pid and ppid
503                 # were retrieved
504                 if self.pid and self.ppid:
505                     (out, err), proc = self.node.kill(self.pid, self.ppid)
506
507                     if out or err:
508                         # check if execution errors occurred
509                         msg = " Failed to STOP command '%s' " % self.get("command")
510                         self.error(msg, out, err)
511                         self._state = ResourceState.FAILED
512                         stopped = False
513
514             if stopped:
515                 super(LinuxApplication, self).stop()
516
517     def release(self):
518         self.info("Releasing resource")
519
520         tear_down = self.get("tearDown")
521         if tear_down:
522             self.node.execute(tear_down)
523
524         self.stop()
525
526         if self.state == ResourceState.STOPPED:
527             super(LinuxApplication, self).release()
528     
529     @property
530     def state(self):
531         """ Returns the state of the application
532         """
533         if self._state == ResourceState.STARTED:
534             if self.in_foreground:
535                 # Check if the process we used to execute the command
536                 # is still running ...
537                 retcode = self._proc.poll()
538
539                 # retcode == None -> running
540                 # retcode > 0 -> error
541                 # retcode == 0 -> finished
542                 if retcode:
543                     out = ""
544                     msg = " Failed to execute command '%s'" % self.get("command")
545                     err = self._proc.stderr.read()
546                     self.error(msg, out, err)
547                     self._state = ResourceState.FAILED
548                 elif retcode == 0:
549                     self._state = ResourceState.FINISHED
550
551             else:
552                 # We need to query the status of the command we launched in 
553                 # background. In oredr to avoid overwhelming the remote host and
554                 # the local processor with too many ssh queries, the state is only
555                 # requested every 'state_check_delay' seconds.
556                 state_check_delay = 0.5
557                 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
558                     # check if execution errors occurred
559                     (out, err), proc = self.node.check_errors(self.app_home)
560
561                     if err:
562                         msg = " Failed to execute command '%s'" % self.get("command")
563                         self.error(msg, out, err)
564                         self._state = ResourceState.FAILED
565
566                     elif self.pid and self.ppid:
567                         # No execution errors occurred. Make sure the background
568                         # process with the recorded pid is still running.
569                         status = self.node.status(self.pid, self.ppid)
570
571                         if status == ProcStatus.FINISHED:
572                             self._state = ResourceState.FINISHED
573
574                     self._last_state_check = strfnow()
575
576         return self._state
577
578     def replace_paths(self, command):
579         """
580         Replace all special path tags with shell-escaped actual paths.
581         """
582         def absolute_dir(d):
583             return d if d.startswith("/") else os.path.join("${HOME}", d)
584
585         return ( command
586             .replace("${SOURCES}", absolute_dir(self.src_dir))
587             .replace("${BUILD}", absolute_dir(self.build_dir))
588             .replace("${APP_HOME}", absolute_dir(self.app_home))
589             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
590             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
591             )
592         
593     def valid_connection(self, guid):
594         # TODO: Validate!
595         return True
596