Adding trace Collector RM
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
23     reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: compare_hash for all files that are uploaded!
33
34
35 @clsinit
36 class LinuxApplication(ResourceManager):
37     _rtype = "LinuxApplication"
38
39     @classmethod
40     def _register_attributes(cls):
41         command = Attribute("command", "Command to execute", 
42                 flags = Flags.ExecReadOnly)
43         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
44                 flags = Flags.ExecReadOnly)
45         env = Attribute("env", "Environment variables string for command execution",
46                 flags = Flags.ExecReadOnly)
47         sudo = Attribute("sudo", "Run with root privileges", 
48                 flags = Flags.ExecReadOnly)
49         depends = Attribute("depends", 
50                 "Space-separated list of packages required to run the application",
51                 flags = Flags.ExecReadOnly)
52         sources = Attribute("sources", 
53                 "Space-separated list of regular files to be deployed in the working "
54                 "path prior to building. Archives won't be expanded automatically.",
55                 flags = Flags.ExecReadOnly)
56         code = Attribute("code", 
57                 "Plain text source code to be uploaded to the server. It will be stored "
58                 "under ${SOURCES}/code",
59                 flags = Flags.ExecReadOnly)
60         build = Attribute("build", 
61                 "Build commands to execute after deploying the sources. "
62                 "Sources will be in the ${SOURCES} folder. "
63                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
64                 "Try to make the commands return with a nonzero exit code on error.\n"
65                 "Also, do not install any programs here, use the 'install' attribute. This will "
66                 "help keep the built files constrained to the build folder (which may "
67                 "not be the home folder), and will result in faster deployment. Also, "
68                 "make sure to clean up temporary files, to reduce bandwidth usage between "
69                 "nodes when transferring built packages.",
70                 flags = Flags.ReadOnly)
71         install = Attribute("install", 
72                 "Commands to transfer built files to their final destinations. "
73                 "Sources will be in the initial working folder, and a special "
74                 "tag ${SOURCES} can be used to reference the experiment's "
75                 "home folder (where the application commands will run).\n"
76                 "ALL sources and targets needed for execution must be copied there, "
77                 "if building has been enabled.\n"
78                 "That is, 'slave' nodes will not automatically get any source files. "
79                 "'slave' nodes don't get build dependencies either, so if you need "
80                 "make and other tools to install, be sure to provide them as "
81                 "actual dependencies instead.",
82                 flags = Flags.ReadOnly)
83         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
84         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
85         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
86         tear_down = Attribute("tearDown", "Bash script to be executed before "
87                 "releasing the resource", 
88                 flags = Flags.ReadOnly)
89
90         cls._register_attribute(command)
91         cls._register_attribute(forward_x11)
92         cls._register_attribute(env)
93         cls._register_attribute(sudo)
94         cls._register_attribute(depends)
95         cls._register_attribute(sources)
96         cls._register_attribute(code)
97         cls._register_attribute(build)
98         cls._register_attribute(install)
99         cls._register_attribute(stdin)
100         cls._register_attribute(stdout)
101         cls._register_attribute(stderr)
102         cls._register_attribute(tear_down)
103
104     @classmethod
105     def _register_traces(cls):
106         stdout = Trace("stdout", "Standard output stream")
107         stderr = Trace("stderr", "Standard error stream")
108
109         cls._register_trace(stdout)
110         cls._register_trace(stderr)
111
112     def __init__(self, ec, guid):
113         super(LinuxApplication, self).__init__(ec, guid)
114         self._pid = None
115         self._ppid = None
116         self._home = "app-%s" % self.guid
117         self._in_foreground = False
118
119         # keep a reference to the running process handler when 
120         # the command is not executed as remote daemon in background
121         self._proc = None
122
123         # timestamp of last state check of the application
124         self._last_state_check = tnow()
125     
126     def log_message(self, msg):
127         return " guid %d - host %s - %s " % (self.guid, 
128                 self.node.get("hostname"), msg)
129
130     @property
131     def node(self):
132         node = self.get_connected(LinuxNode.rtype())
133         if node: return node[0]
134         return None
135
136     @property
137     def app_home(self):
138         return os.path.join(self.node.exp_home, self._home)
139
140     @property
141     def src_dir(self):
142         return os.path.join(self.app_home, 'src')
143
144     @property
145     def build_dir(self):
146         return os.path.join(self.app_home, 'build')
147
148     @property
149     def pid(self):
150         return self._pid
151
152     @property
153     def ppid(self):
154         return self._ppid
155
156     @property
157     def in_foreground(self):
158         """ Returns True if the command needs to be executed in foreground.
159         This means that command will be executed using 'execute' instead of
160         'run' ('run' executes a command in background and detached from the 
161         terminal)
162         
163         When using X11 forwarding option, the command can not run in background
164         and detached from a terminal, since we need to keep the terminal attached 
165         to interact with it.
166         """
167         return self.get("forwardX11") or self._in_foreground
168
169     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
170         self.info("Retrieving '%s' trace %s " % (name, attr))
171
172         path = os.path.join(self.app_home, name)
173         
174         command = "(test -f %s && echo 'success') || echo 'error'" % path
175         (out, err), proc = self.node.execute(command)
176
177         if (err and proc.poll()) or out.find("error") != -1:
178             msg = " Couldn't find trace %s " % name
179             self.error(msg, out, err)
180             return None
181     
182         if attr == TraceAttr.PATH:
183             return path
184
185         if attr == TraceAttr.ALL:
186             (out, err), proc = self.node.check_output(self.app_home, name)
187             
188             if err and proc.poll():
189                 msg = " Couldn't read trace %s " % name
190                 self.error(msg, out, err)
191                 return None
192
193             return out
194
195         if attr == TraceAttr.STREAM:
196             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
197         elif attr == TraceAttr.SIZE:
198             cmd = "stat -c%%s %s " % path
199
200         (out, err), proc = self.node.execute(cmd)
201
202         if err and proc.poll():
203             msg = " Couldn't find trace %s " % name
204             self.error(msg, out, err)
205             return None
206         
207         if attr == TraceAttr.SIZE:
208             out = int(out.strip())
209
210         return out
211             
212     def provision(self):
213         # create home dir for application
214         self.node.mkdir(self.app_home)
215
216         # upload sources
217         self.upload_sources()
218
219         # upload code
220         self.upload_code()
221
222         # upload stdin
223         self.upload_stdin()
224
225         # install dependencies
226         self.install_dependencies()
227
228         # build
229         self.build()
230
231         # Install
232         self.install()
233
234         # Upload command to remote bash script
235         # - only if command can be executed in background and detached
236         command = self.get("command")
237
238         if command and not self.in_foreground:
239             self.info("Uploading command '%s'" % command)
240
241             # replace application specific paths in the command
242             command = self.replace_paths(command)
243             
244             # replace application specific paths in the environment
245             env = self.get("env")
246             env = env and self.replace_paths(env)
247
248             self.node.upload_command(command, self.app_home, 
249                     shfile = "app.sh",
250                     env = env)
251        
252         self.info("Provisioning finished")
253
254         super(LinuxApplication, self).provision()
255
256     def upload_sources(self):
257         sources = self.get("sources")
258         if sources:
259             self.info("Uploading sources ")
260
261             # create dir for sources
262             self.node.mkdir(self.src_dir)
263
264             sources = sources.split(' ')
265
266             http_sources = list()
267             for source in list(sources):
268                 if source.startswith("http") or source.startswith("https"):
269                     http_sources.append(source)
270                     sources.remove(source)
271
272             # Download http sources remotely
273             if http_sources:
274                 command = [" wget -c --directory-prefix=${SOURCES} "]
275                 check = []
276
277                 for source in http_sources:
278                     command.append(" %s " % (source))
279                     check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
280                 
281                 command = " ".join(command)
282                 check = " ; ".join(check)
283
284                 # Append the command to check that the sources were downloaded
285                 command += " ; %s " % check
286
287                 # replace application specific paths in the command
288                 command = self.replace_paths(command)
289                 
290                 # Upload the command to a bash script and run it
291                 # in background ( but wait until the command has
292                 # finished to continue )
293                 self.node.run_and_wait(command, self.app_home,
294                         shfile = "http_sources.sh",
295                         pidfile = "http_sources_pidfile", 
296                         ecodefile = "http_sources_exitcode", 
297                         stdout = "http_sources_stdout", 
298                         stderr = "http_sources_stderr")
299
300             if sources:
301                 self.node.upload(sources, self.src_dir)
302
303     def upload_code(self):
304         code = self.get("code")
305         if code:
306             # create dir for sources
307             self.node.mkdir(self.src_dir)
308
309             self.info("Uploading code ")
310
311             dst = os.path.join(self.src_dir, "code")
312             self.node.upload(sources, dst, text = True)
313
314     def upload_stdin(self):
315         stdin = self.get("stdin")
316         if stdin:
317             # create dir for sources
318             self.info(" Uploading stdin ")
319             
320             dst = os.path.join(self.app_home, "stdin")
321
322             # If what we are uploading is a file, check whether
323             # the same file already exists (using md5sum)
324             if self.compare_hash(stdin, dst):
325                 return
326
327             self.node.upload(stdin, dst, text = True)
328
329     def install_dependencies(self):
330         depends = self.get("depends")
331         if depends:
332             self.info("Installing dependencies %s" % depends)
333             self.node.install_packages(depends, self.app_home)
334
335     def build(self):
336         build = self.get("build")
337         if build:
338             self.info("Building sources ")
339             
340             # create dir for build
341             self.node.mkdir(self.build_dir)
342
343             # replace application specific paths in the command
344             command = self.replace_paths(build)
345
346             # Upload the command to a bash script and run it
347             # in background ( but wait until the command has
348             # finished to continue )
349             self.node.run_and_wait(command, self.app_home,
350                     shfile = "build.sh",
351                     pidfile = "build_pidfile", 
352                     ecodefile = "build_exitcode", 
353                     stdout = "build_stdout", 
354                     stderr = "build_stderr")
355  
356     def install(self):
357         install = self.get("install")
358         if install:
359             self.info("Installing sources ")
360
361             # replace application specific paths in the command
362             command = self.replace_paths(install)
363
364             # Upload the command to a bash script and run it
365             # in background ( but wait until the command has
366             # finished to continue )
367             self.node.run_and_wait(command, self.app_home,
368                     shfile = "install.sh",
369                     pidfile = "install_pidfile", 
370                     ecodefile = "install_exitcode", 
371                     stdout = "install_stdout", 
372                     stderr = "install_stderr")
373
374     def deploy(self):
375         # Wait until node is associated and deployed
376         node = self.node
377         if not node or node.state < ResourceState.READY:
378             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
379             self.ec.schedule(reschedule_delay, self.deploy)
380         else:
381             try:
382                 command = self.get("command") or ""
383                 self.info("Deploying command '%s' " % command)
384                 self.discover()
385                 self.provision()
386             except:
387                 self._state = ResourceState.FAILED
388                 raise
389
390             super(LinuxApplication, self).deploy()
391
392     def start(self):
393         command = self.get("command")
394
395         self.info("Starting command '%s'" % command)
396
397         if not command:
398             # If no command was given (i.e. Application was used for dependency
399             # installation), then the application is directly marked as FINISHED
400             self._state = ResourceState.FINISHED
401         else:
402
403             if self.in_foreground:
404                 self._start_in_foreground()
405             else:
406                 self._start_in_background()
407
408             super(LinuxApplication, self).start()
409
410     def _start_in_foreground(self):
411         command = self.get("command")
412         stdin = "stdin" if self.get("stdin") else None
413         sudo = self.get("sudo") or False
414         x11 = self.get("forwardX11")
415
416         # Command will be launched in foreground and attached to the
417         # terminal using the node 'execute' in non blocking mode.
418
419         # Export environment
420         env = self.get("env")
421         environ = self.node.format_environment(env, inline = True)
422         command = environ + command
423         command = self.replace_paths(command)
424
425         # We save the reference to the process in self._proc 
426         # to be able to kill the process from the stop method.
427         # We also set blocking = False, since we don't want the
428         # thread to block until the execution finishes.
429         (out, err), self._proc = self.node.execute(command,
430                 sudo = sudo,
431                 stdin = stdin,
432                 forward_x11 = x11,
433                 blocking = False)
434
435         if self._proc.poll():
436             self._state = ResourceState.FAILED
437             self.error(msg, out, err)
438             raise RuntimeError, msg
439
440     def _start_in_background(self):
441         command = self.get("command")
442         env = self.get("env")
443         stdin = "stdin" if self.get("stdin") else None
444         stdout = "stdout" if self.get("stdout") else "stdout"
445         stderr = "stderr" if self.get("stderr") else "stderr"
446         sudo = self.get("sudo") or False
447
448         # Command will be as a daemon in baground and detached from any terminal.
449         # The real command to run was previously uploaded to a bash script
450         # during deployment, now launch the remote script using 'run'
451         # method from the node
452         cmd = "bash ./app.sh"
453         (out, err), proc = self.node.run(cmd, self.app_home, 
454             stdin = stdin, 
455             stdout = stdout,
456             stderr = stderr,
457             sudo = sudo)
458
459         # check if execution errors occurred
460         msg = " Failed to start command '%s' " % command
461         
462         if proc.poll():
463             self._state = ResourceState.FAILED
464             self.error(msg, out, err)
465             raise RuntimeError, msg
466     
467         # Wait for pid file to be generated
468         pid, ppid = self.node.wait_pid(self.app_home)
469         if pid: self._pid = int(pid)
470         if ppid: self._ppid = int(ppid)
471
472         # If the process is not running, check for error information
473         # on the remote machine
474         if not self.pid or not self.ppid:
475             (out, err), proc = self.node.check_errors(self.app_home,
476                     stderr = stderr) 
477
478             # Out is what was written in the stderr file
479             if err:
480                 self._state = ResourceState.FAILED
481                 msg = " Failed to start command '%s' " % command
482                 self.error(msg, out, err)
483                 raise RuntimeError, msg
484         
485     def stop(self):
486         """ Stops application execution
487         """
488         command = self.get('command') or ''
489
490         if self.state == ResourceState.STARTED:
491             stopped = True
492
493             self.info("Stopping command '%s'" % command)
494         
495             # If the command is running in foreground (it was launched using
496             # the node 'execute' method), then we use the handler to the Popen
497             # process to kill it. Else we send a kill signal using the pid and ppid
498             # retrieved after running the command with the node 'run' method
499
500             if self._proc:
501                 self._proc.kill()
502             else:
503                 # Only try to kill the process if the pid and ppid
504                 # were retrieved
505                 if self.pid and self.ppid:
506                     (out, err), proc = self.node.kill(self.pid, self.ppid)
507
508                     if out or err:
509                         # check if execution errors occurred
510                         msg = " Failed to STOP command '%s' " % self.get("command")
511                         self.error(msg, out, err)
512                         self._state = ResourceState.FAILED
513                         stopped = False
514
515             if stopped:
516                 super(LinuxApplication, self).stop()
517
518     def release(self):
519         self.info("Releasing resource")
520
521         tear_down = self.get("tearDown")
522         if tear_down:
523             self.node.execute(tear_down)
524
525         self.stop()
526
527         if self.state == ResourceState.STOPPED:
528             super(LinuxApplication, self).release()
529     
530     @property
531     def state(self):
532         """ Returns the state of the application
533         """
534         if self._state == ResourceState.STARTED:
535             if self.in_foreground:
536                 # Check if the process we used to execute the command
537                 # is still running ...
538                 retcode = self._proc.poll()
539
540                 # retcode == None -> running
541                 # retcode > 0 -> error
542                 # retcode == 0 -> finished
543                 if retcode:
544                     out = ""
545                     msg = " Failed to execute command '%s'" % self.get("command")
546                     err = self._proc.stderr.read()
547                     self.error(msg, out, err)
548                     self._state = ResourceState.FAILED
549                 elif retcode == 0:
550                     self._state = ResourceState.FINISHED
551
552             else:
553                 # We need to query the status of the command we launched in 
554                 # background. In oredr to avoid overwhelming the remote host and
555                 # the local processor with too many ssh queries, the state is only
556                 # requested every 'state_check_delay' seconds.
557                 state_check_delay = 0.5
558                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
559                     # check if execution errors occurred
560                     (out, err), proc = self.node.check_errors(self.app_home)
561
562                     if err:
563                         msg = " Failed to execute command '%s'" % self.get("command")
564                         self.error(msg, out, err)
565                         self._state = ResourceState.FAILED
566
567                     elif self.pid and self.ppid:
568                         # No execution errors occurred. Make sure the background
569                         # process with the recorded pid is still running.
570                         status = self.node.status(self.pid, self.ppid)
571
572                         if status == ProcStatus.FINISHED:
573                             self._state = ResourceState.FINISHED
574
575                     self._last_state_check = tnow()
576
577         return self._state
578
579     def replace_paths(self, command):
580         """
581         Replace all special path tags with shell-escaped actual paths.
582         """
583         def absolute_dir(d):
584             return d if d.startswith("/") else os.path.join("${HOME}", d)
585
586         return ( command
587             .replace("${SOURCES}", absolute_dir(self.src_dir))
588             .replace("${BUILD}", absolute_dir(self.build_dir))
589             .replace("${APP_HOME}", absolute_dir(self.app_home))
590             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
591             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
592             )
593
594     def compare_hash(self, local, remote):
595         # getting md5sum from remote file
596         (out, err), proc = self.node.execute("md5sum %s " % remote)
597
598         if proc.poll() == 0: #OK
599             if not os.path.isfile(local):
600                 # store to a tmp file
601                 f = tempfile.NamedTemporaryFile()
602                 f.write(local)
603                 f.flush()
604                 local = f.name
605
606             lproc = subprocess.Popen(["md5sum", local],
607                 stdout = subprocess.PIPE,
608                 stderr = subprocess.PIPE) 
609
610             # getting md5sum from local file
611             (lout, lerr) = lproc.communicate()
612
613             # files are the same, no need to upload
614             lchk = lout.strip().split(" ")[0]
615             rchk = out.strip().split(" ")[0]
616
617             msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % (
618                     local, lchk, remote, rchk)
619             self.debug(msg)
620
621             if lchk == rchk:
622                 return True
623
624         return False
625
626     def valid_connection(self, guid):
627         # TODO: Validate!
628         return True
629