Adding comments to Linux CCN examples
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import strfnow, strfdiff
26
27 import os
28 import subprocess
29
30 # TODO: Resolve wildcards in commands!!
31 # TODO: compare_hash for all files that are uploaded!
32
33
34 @clsinit
35 class LinuxApplication(ResourceManager):
36     _rtype = "LinuxApplication"
37
38     @classmethod
39     def _register_attributes(cls):
40         command = Attribute("command", "Command to execute", 
41                 flags = Flags.ExecReadOnly)
42         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
43                 flags = Flags.ExecReadOnly)
44         env = Attribute("env", "Environment variables string for command execution",
45                 flags = Flags.ExecReadOnly)
46         sudo = Attribute("sudo", "Run with root privileges", 
47                 flags = Flags.ExecReadOnly)
48         depends = Attribute("depends", 
49                 "Space-separated list of packages required to run the application",
50                 flags = Flags.ExecReadOnly)
51         sources = Attribute("sources", 
52                 "Space-separated list of regular files to be deployed in the working "
53                 "path prior to building. Archives won't be expanded automatically.",
54                 flags = Flags.ExecReadOnly)
55         code = Attribute("code", 
56                 "Plain text source code to be uploaded to the server. It will be stored "
57                 "under ${SOURCES}/code",
58                 flags = Flags.ExecReadOnly)
59         build = Attribute("build", 
60                 "Build commands to execute after deploying the sources. "
61                 "Sources will be in the ${SOURCES} folder. "
62                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
63                 "Try to make the commands return with a nonzero exit code on error.\n"
64                 "Also, do not install any programs here, use the 'install' attribute. This will "
65                 "help keep the built files constrained to the build folder (which may "
66                 "not be the home folder), and will result in faster deployment. Also, "
67                 "make sure to clean up temporary files, to reduce bandwidth usage between "
68                 "nodes when transferring built packages.",
69                 flags = Flags.ReadOnly)
70         install = Attribute("install", 
71                 "Commands to transfer built files to their final destinations. "
72                 "Sources will be in the initial working folder, and a special "
73                 "tag ${SOURCES} can be used to reference the experiment's "
74                 "home folder (where the application commands will run).\n"
75                 "ALL sources and targets needed for execution must be copied there, "
76                 "if building has been enabled.\n"
77                 "That is, 'slave' nodes will not automatically get any source files. "
78                 "'slave' nodes don't get build dependencies either, so if you need "
79                 "make and other tools to install, be sure to provide them as "
80                 "actual dependencies instead.",
81                 flags = Flags.ReadOnly)
82         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
83         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
84         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
85         tear_down = Attribute("tearDown", "Bash script to be executed before "
86                 "releasing the resource", 
87                 flags = Flags.ReadOnly)
88
89         cls._register_attribute(command)
90         cls._register_attribute(forward_x11)
91         cls._register_attribute(env)
92         cls._register_attribute(sudo)
93         cls._register_attribute(depends)
94         cls._register_attribute(sources)
95         cls._register_attribute(code)
96         cls._register_attribute(build)
97         cls._register_attribute(install)
98         cls._register_attribute(stdin)
99         cls._register_attribute(stdout)
100         cls._register_attribute(stderr)
101         cls._register_attribute(tear_down)
102
103     @classmethod
104     def _register_traces(cls):
105         stdout = Trace("stdout", "Standard output stream")
106         stderr = Trace("stderr", "Standard error stream")
107
108         cls._register_trace(stdout)
109         cls._register_trace(stderr)
110
111     def __init__(self, ec, guid):
112         super(LinuxApplication, self).__init__(ec, guid)
113         self._pid = None
114         self._ppid = None
115         self._home = "app-%s" % self.guid
116         self._in_foreground = False
117
118         # keep a reference to the running process handler when 
119         # the command is not executed as remote daemon in background
120         self._proc = None
121
122         # timestamp of last state check of the application
123         self._last_state_check = strfnow()
124     
125     def log_message(self, msg):
126         return " guid %d - host %s - %s " % (self.guid, 
127                 self.node.get("hostname"), msg)
128
129     @property
130     def node(self):
131         node = self.get_connected(LinuxNode.rtype())
132         if node: return node[0]
133         return None
134
135     @property
136     def app_home(self):
137         return os.path.join(self.node.exp_home, self._home)
138
139     @property
140     def src_dir(self):
141         return os.path.join(self.app_home, 'src')
142
143     @property
144     def build_dir(self):
145         return os.path.join(self.app_home, 'build')
146
147     @property
148     def pid(self):
149         return self._pid
150
151     @property
152     def ppid(self):
153         return self._ppid
154
155     @property
156     def in_foreground(self):
157         """ Returns True if the command needs to be executed in foreground.
158         This means that command will be executed using 'execute' instead of
159         'run' ('run' executes a command in background and detached from the 
160         terminal)
161         
162         When using X11 forwarding option, the command can not run in background
163         and detached from a terminal, since we need to keep the terminal attached 
164         to interact with it.
165         """
166         return self.get("forwardX11") or self._in_foreground
167
168     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
169         self.info("Retrieving '%s' trace %s " % (name, attr))
170
171         path = os.path.join(self.app_home, name)
172         
173         command = "(test -f %s && echo 'success') || echo 'error'" % path
174         (out, err), proc = self.node.execute(command)
175
176         if (err and proc.poll()) or out.find("error") != -1:
177             msg = " Couldn't find trace %s " % name
178             self.error(msg, out, err)
179             return None
180     
181         if attr == TraceAttr.PATH:
182             return path
183
184         if attr == TraceAttr.ALL:
185             (out, err), proc = self.node.check_output(self.app_home, name)
186             
187             if err and proc.poll():
188                 msg = " Couldn't read trace %s " % name
189                 self.error(msg, out, err)
190                 return None
191
192             return out
193
194         if attr == TraceAttr.STREAM:
195             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
196         elif attr == TraceAttr.SIZE:
197             cmd = "stat -c%%s %s " % path
198
199         (out, err), proc = self.node.execute(cmd)
200
201         if err and proc.poll():
202             msg = " Couldn't find trace %s " % name
203             self.error(msg, out, err)
204             return None
205         
206         if attr == TraceAttr.SIZE:
207             out = int(out.strip())
208
209         return out
210             
211     def provision(self):
212         # create home dir for application
213         self.node.mkdir(self.app_home)
214
215         # upload sources
216         self.upload_sources()
217
218         # upload code
219         self.upload_code()
220
221         # upload stdin
222         self.upload_stdin()
223
224         # install dependencies
225         self.install_dependencies()
226
227         # build
228         self.build()
229
230         # Install
231         self.install()
232
233         # Upload command to remote bash script
234         # - only if command can be executed in background and detached
235         command = self.get("command")
236
237         if command and not self.in_foreground:
238             self.info("Uploading command '%s'" % command)
239
240             # replace application specific paths in the command
241             command = self.replace_paths(command)
242             
243             # replace application specific paths in the environment
244             env = self.get("env")
245             env = env and self.replace_paths(env)
246
247             self.node.upload_command(command, self.app_home, 
248                     shfile = "app.sh",
249                     env = env)
250        
251         self.info("Provisioning finished")
252
253         super(LinuxApplication, self).provision()
254
255     def upload_sources(self):
256         sources = self.get("sources")
257         if sources:
258             self.info("Uploading sources ")
259
260             # create dir for sources
261             self.node.mkdir(self.src_dir)
262
263             sources = sources.split(' ')
264
265             http_sources = list()
266             for source in list(sources):
267                 if source.startswith("http") or source.startswith("https"):
268                     http_sources.append(source)
269                     sources.remove(source)
270
271             # Download http sources remotely
272             if http_sources:
273                 command = [" wget -c --directory-prefix=${SOURCES} "]
274                 check = []
275
276                 for source in http_sources:
277                     command.append(" %s " % (source))
278                     check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
279                 
280                 command = " ".join(command)
281                 check = " ; ".join(check)
282
283                 # Append the command to check that the sources were downloaded
284                 command += " ; %s " % check
285
286                 # replace application specific paths in the command
287                 command = self.replace_paths(command)
288                 
289                 # Upload the command to a bash script and run it
290                 # in background ( but wait until the command has
291                 # finished to continue )
292                 self.node.run_and_wait(command, self.app_home,
293                         shfile = "http_sources.sh",
294                         pidfile = "http_sources_pidfile", 
295                         ecodefile = "http_sources_exitcode", 
296                         stdout = "http_sources_stdout", 
297                         stderr = "http_sources_stderr")
298
299             if sources:
300                 self.node.upload(sources, self.src_dir)
301
302     def upload_code(self):
303         code = self.get("code")
304         if code:
305             # create dir for sources
306             self.node.mkdir(self.src_dir)
307
308             self.info("Uploading code ")
309
310             dst = os.path.join(self.src_dir, "code")
311             self.node.upload(sources, dst, text = True)
312
313     def upload_stdin(self):
314         stdin = self.get("stdin")
315         if stdin:
316             # create dir for sources
317             self.info(" Uploading stdin ")
318             
319             dst = os.path.join(self.app_home, "stdin")
320
321             # If what we are uploading is a file, check whether
322             # the same file already exists (using md5sum)
323             if self.compare_hash(stdin, dst):
324                 return
325
326             self.node.upload(stdin, dst, text = True)
327
328     def install_dependencies(self):
329         depends = self.get("depends")
330         if depends:
331             self.info("Installing dependencies %s" % depends)
332             self.node.install_packages(depends, self.app_home)
333
334     def build(self):
335         build = self.get("build")
336         if build:
337             self.info("Building sources ")
338             
339             # create dir for build
340             self.node.mkdir(self.build_dir)
341
342             # replace application specific paths in the command
343             command = self.replace_paths(build)
344
345             # Upload the command to a bash script and run it
346             # in background ( but wait until the command has
347             # finished to continue )
348             self.node.run_and_wait(command, self.app_home,
349                     shfile = "build.sh",
350                     pidfile = "build_pidfile", 
351                     ecodefile = "build_exitcode", 
352                     stdout = "build_stdout", 
353                     stderr = "build_stderr")
354  
355     def install(self):
356         install = self.get("install")
357         if install:
358             self.info("Installing sources ")
359
360             # replace application specific paths in the command
361             command = self.replace_paths(install)
362
363             # Upload the command to a bash script and run it
364             # in background ( but wait until the command has
365             # finished to continue )
366             self.node.run_and_wait(command, self.app_home,
367                     shfile = "install.sh",
368                     pidfile = "install_pidfile", 
369                     ecodefile = "install_exitcode", 
370                     stdout = "install_stdout", 
371                     stderr = "install_stderr")
372
373     def deploy(self):
374         # Wait until node is associated and deployed
375         node = self.node
376         if not node or node.state < ResourceState.READY:
377             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
378             
379             reschedule_delay = "0.5s"
380             self.ec.schedule(reschedule_delay, self.deploy)
381         else:
382             try:
383                 command = self.get("command") or ""
384                 self.info("Deploying command '%s' " % command)
385                 self.discover()
386                 self.provision()
387             except:
388                 self._state = ResourceState.FAILED
389                 raise
390
391             super(LinuxApplication, self).deploy()
392
393     def start(self):
394         command = self.get("command")
395
396         self.info("Starting command '%s'" % command)
397
398         if not command:
399             # If no command was given (i.e. Application was used for dependency
400             # installation), then the application is directly marked as FINISHED
401             self._state = ResourceState.FINISHED
402         else:
403
404             if self.in_foreground:
405                 self._start_in_foreground()
406             else:
407                 self._start_in_background()
408
409             super(LinuxApplication, self).start()
410
411     def _start_in_foreground(self):
412         command = self.get("command")
413         stdin = "stdin" if self.get("stdin") else None
414         sudo = self.get("sudo") or False
415         x11 = self.get("forwardX11")
416
417         # Command will be launched in foreground and attached to the
418         # terminal using the node 'execute' in non blocking mode.
419
420         # Export environment
421         env = self.get("env")
422         environ = self.node.format_environment(env, inline = True)
423         command = environ + command
424         command = self.replace_paths(command)
425
426         # We save the reference to the process in self._proc 
427         # to be able to kill the process from the stop method.
428         # We also set blocking = False, since we don't want the
429         # thread to block until the execution finishes.
430         (out, err), self._proc = self.node.execute(command,
431                 sudo = sudo,
432                 stdin = stdin,
433                 forward_x11 = x11,
434                 blocking = False)
435
436         if self._proc.poll():
437             self._state = ResourceState.FAILED
438             self.error(msg, out, err)
439             raise RuntimeError, msg
440
441     def _start_in_background(self):
442         command = self.get("command")
443         env = self.get("env")
444         stdin = "stdin" if self.get("stdin") else None
445         stdout = "stdout" if self.get("stdout") else "stdout"
446         stderr = "stderr" if self.get("stderr") else "stderr"
447         sudo = self.get("sudo") or False
448
449         # Command will be as a daemon in baground and detached from any terminal.
450         # The real command to run was previously uploaded to a bash script
451         # during deployment, now launch the remote script using 'run'
452         # method from the node
453         cmd = "bash ./app.sh"
454         (out, err), proc = self.node.run(cmd, self.app_home, 
455             stdin = stdin, 
456             stdout = stdout,
457             stderr = stderr,
458             sudo = sudo)
459
460         # check if execution errors occurred
461         msg = " Failed to start command '%s' " % command
462         
463         if proc.poll():
464             self._state = ResourceState.FAILED
465             self.error(msg, out, err)
466             raise RuntimeError, msg
467     
468         # Wait for pid file to be generated
469         pid, ppid = self.node.wait_pid(self.app_home)
470         if pid: self._pid = int(pid)
471         if ppid: self._ppid = int(ppid)
472
473         # If the process is not running, check for error information
474         # on the remote machine
475         if not self.pid or not self.ppid:
476             (out, err), proc = self.node.check_errors(self.app_home,
477                     stderr = stderr) 
478
479             # Out is what was written in the stderr file
480             if err:
481                 self._state = ResourceState.FAILED
482                 msg = " Failed to start command '%s' " % command
483                 self.error(msg, out, err)
484                 raise RuntimeError, msg
485         
486     def stop(self):
487         """ Stops application execution
488         """
489         command = self.get('command') or ''
490
491         if self.state == ResourceState.STARTED:
492             stopped = True
493
494             self.info("Stopping command '%s'" % command)
495         
496             # If the command is running in foreground (it was launched using
497             # the node 'execute' method), then we use the handler to the Popen
498             # process to kill it. Else we send a kill signal using the pid and ppid
499             # retrieved after running the command with the node 'run' method
500
501             if self._proc:
502                 self._proc.kill()
503             else:
504                 # Only try to kill the process if the pid and ppid
505                 # were retrieved
506                 if self.pid and self.ppid:
507                     (out, err), proc = self.node.kill(self.pid, self.ppid)
508
509                     if out or err:
510                         # check if execution errors occurred
511                         msg = " Failed to STOP command '%s' " % self.get("command")
512                         self.error(msg, out, err)
513                         self._state = ResourceState.FAILED
514                         stopped = False
515
516             if stopped:
517                 super(LinuxApplication, self).stop()
518
519     def release(self):
520         self.info("Releasing resource")
521
522         tear_down = self.get("tearDown")
523         if tear_down:
524             self.node.execute(tear_down)
525
526         self.stop()
527
528         if self.state == ResourceState.STOPPED:
529             super(LinuxApplication, self).release()
530     
531     @property
532     def state(self):
533         """ Returns the state of the application
534         """
535         if self._state == ResourceState.STARTED:
536             if self.in_foreground:
537                 # Check if the process we used to execute the command
538                 # is still running ...
539                 retcode = self._proc.poll()
540
541                 # retcode == None -> running
542                 # retcode > 0 -> error
543                 # retcode == 0 -> finished
544                 if retcode:
545                     out = ""
546                     msg = " Failed to execute command '%s'" % self.get("command")
547                     err = self._proc.stderr.read()
548                     self.error(msg, out, err)
549                     self._state = ResourceState.FAILED
550                 elif retcode == 0:
551                     self._state = ResourceState.FINISHED
552
553             else:
554                 # We need to query the status of the command we launched in 
555                 # background. In oredr to avoid overwhelming the remote host and
556                 # the local processor with too many ssh queries, the state is only
557                 # requested every 'state_check_delay' seconds.
558                 state_check_delay = 0.5
559                 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
560                     # check if execution errors occurred
561                     (out, err), proc = self.node.check_errors(self.app_home)
562
563                     if err:
564                         msg = " Failed to execute command '%s'" % self.get("command")
565                         self.error(msg, out, err)
566                         self._state = ResourceState.FAILED
567
568                     elif self.pid and self.ppid:
569                         # No execution errors occurred. Make sure the background
570                         # process with the recorded pid is still running.
571                         status = self.node.status(self.pid, self.ppid)
572
573                         if status == ProcStatus.FINISHED:
574                             self._state = ResourceState.FINISHED
575
576                     self._last_state_check = strfnow()
577
578         return self._state
579
580     def replace_paths(self, command):
581         """
582         Replace all special path tags with shell-escaped actual paths.
583         """
584         def absolute_dir(d):
585             return d if d.startswith("/") else os.path.join("${HOME}", d)
586
587         return ( command
588             .replace("${SOURCES}", absolute_dir(self.src_dir))
589             .replace("${BUILD}", absolute_dir(self.build_dir))
590             .replace("${APP_HOME}", absolute_dir(self.app_home))
591             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
592             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
593             )
594
595     def compare_hash(self, local, remote):
596         # getting md5sum from remote file
597         (out, err), proc = self.node.execute("md5sum %s " % remote)
598
599         if proc.poll() == 0: #OK
600             if not os.path.isfile(local):
601                 # store to a tmp file
602                 f = tempfile.NamedTemporaryFile()
603                 f.write(local)
604                 f.flush()
605                 local = f.name
606
607             lproc = subprocess.Popen(["md5sum", local],
608                 stdout = subprocess.PIPE,
609                 stderr = subprocess.PIPE) 
610
611             # getting md5sum from local file
612             (lout, lerr) = lproc.communicate()
613
614             # files are the same, no need to upload
615             lchk = lout.strip().split(" ")[0]
616             rchk = out.strip().split(" ")[0]
617
618             msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % (
619                     local, lchk, remote, rchk)
620             self.debug(msg)
621
622             if lchk == rchk:
623                 return True
624
625         return False
626
627     def valid_connection(self, guid):
628         # TODO: Validate!
629         return True
630