stdout was taking always the default value in LinuxNode check_errors
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33
34 # TODO: Verify files and dirs exists already
35 # TODO: Blacklist nodes!
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 reschedule_delay = "0.5s"
40
41 class ExitCode:
42     """
43     Error codes that the rexitcode function can return if unable to
44     check the exit code of a spawned process
45     """
46     FILENOTFOUND = -1
47     CORRUPTFILE = -2
48     ERROR = -3
49     OK = 0
50
51 class OSType:
52     """
53     Supported flavors of Linux OS
54     """
55     FEDORA_12 = "f12"
56     FEDORA_14 = "f14"
57     FEDORA = "fedora"
58     UBUNTU = "ubuntu"
59     DEBIAN = "debian"
60
61 @clsinit
62 class LinuxNode(ResourceManager):
63     _rtype = "LinuxNode"
64
65     @classmethod
66     def _register_attributes(cls):
67         hostname = Attribute("hostname", "Hostname of the machine",
68                 flags = Flags.ExecReadOnly)
69
70         username = Attribute("username", "Local account username", 
71                 flags = Flags.Credential)
72
73         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
74         
75         home = Attribute("home",
76                 "Experiment home directory to store all experiment related files",
77                 flags = Flags.ExecReadOnly)
78         
79         identity = Attribute("identity", "SSH identity file",
80                 flags = Flags.Credential)
81         
82         server_key = Attribute("serverKey", "Server public key", 
83                 flags = Flags.ExecReadOnly)
84         
85         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
86                 " from home folder before starting experiment", 
87                 flags = Flags.ExecReadOnly)
88         
89         clean_processes = Attribute("cleanProcesses", 
90                 "Kill all running processes before starting experiment",
91                 flags = Flags.ExecReadOnly)
92         
93         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
94                 "releasing the resource",
95                 flags = Flags.ExecReadOnly)
96
97         cls._register_attribute(hostname)
98         cls._register_attribute(username)
99         cls._register_attribute(port)
100         cls._register_attribute(home)
101         cls._register_attribute(identity)
102         cls._register_attribute(server_key)
103         cls._register_attribute(clean_home)
104         cls._register_attribute(clean_processes)
105         cls._register_attribute(tear_down)
106
107     def __init__(self, ec, guid):
108         super(LinuxNode, self).__init__(ec, guid)
109         self._os = None
110         
111         # lock to avoid concurrency issues on methods used by applications 
112         self._lock = threading.Lock()
113     
114     def log_message(self, msg):
115         return " guid %d - host %s - %s " % (self.guid, 
116                 self.get("hostname"), msg)
117
118     @property
119     def home(self):
120         return self.get("home") or ""
121
122     @property
123     def exp_home(self):
124         return os.path.join(self.home, self.ec.exp_id)
125
126     @property
127     def node_home(self):
128         node_home = "node-%d" % self.guid
129         return os.path.join(self.exp_home, node_home)
130
131     @property
132     def os(self):
133         if self._os:
134             return self._os
135
136         if (not self.get("hostname") or not self.get("username")):
137             msg = "Can't resolve OS, insufficient data "
138             self.error(msg)
139             raise RuntimeError, msg
140
141         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
142
143         if err and proc.poll():
144             msg = "Error detecting OS "
145             self.error(msg, out, err)
146             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
147
148         if out.find("Fedora release 12") == 0:
149             self._os = OSType.FEDORA_12
150         elif out.find("Fedora release 14") == 0:
151             self._os = OSType.FEDORA_14
152         elif out.find("Debian") == 0: 
153             self._os = OSType.DEBIAN
154         elif out.find("Ubuntu") ==0:
155             self._os = OSType.UBUNTU
156         else:
157             msg = "Unsupported OS"
158             self.error(msg, out)
159             raise RuntimeError, "%s - %s " %( msg, out )
160
161         return self._os
162
163     @property
164     def localhost(self):
165         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
166
167     def provision(self):
168         if not self.is_alive():
169             self._state = ResourceState.FAILED
170             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
171             self.error(msg)
172             raise RuntimeError, msg
173
174         if self.get("cleanProcesses"):
175             self.clean_processes()
176
177         if self.get("cleanHome"):
178             self.clean_home()
179        
180         self.mkdir(self.node_home)
181
182         super(LinuxNode, self).provision()
183
184     def deploy(self):
185         if self.state == ResourceState.NEW:
186             try:
187                 self.discover()
188                 self.provision()
189             except:
190                 self._state = ResourceState.FAILED
191                 raise
192
193         # Node needs to wait until all associated interfaces are 
194         # ready before it can finalize deployment
195         from nepi.resources.linux.interface import LinuxInterface
196         ifaces = self.get_connected(LinuxInterface.rtype())
197         for iface in ifaces:
198             if iface.state < ResourceState.READY:
199                 self.ec.schedule(reschedule_delay, self.deploy)
200                 return 
201
202         super(LinuxNode, self).deploy()
203
204     def release(self):
205         tear_down = self.get("tearDown")
206         if tear_down:
207             self.execute(tear_down)
208
209         super(LinuxNode, self).release()
210
211     def valid_connection(self, guid):
212         # TODO: Validate!
213         return True
214
215     def clean_processes(self, killer = False):
216         self.info("Cleaning up processes")
217         
218         if killer:
219             # Hardcore kill
220             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
221                 "sudo -S killall python tcpdump || /bin/true ; " +
222                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
223                 "sudo -S killall -u root || /bin/true ; " +
224                 "sudo -S killall -u root || /bin/true ; ")
225         else:
226             # Be gentler...
227             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
228                 "sudo -S killall tcpdump || /bin/true ; " +
229                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
230                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
231
232         out = err = ""
233         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
234             
235     def clean_home(self):
236         self.info("Cleaning up home")
237         
238         cmd = (
239             # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
240             "find . -maxdepth 1 -name 'nepi-*' " +
241             " -execdir rm -rf {} + "
242             )
243             
244         if self.home:
245             cmd = "cd %s ; " % self.home + cmd
246
247         out = err = ""
248         (out, err), proc = self.execute(cmd, with_lock = True)
249
250     def upload(self, src, dst, text = False):
251         """ Copy content to destination
252
253            src  content to copy. Can be a local file, directory or a list of files
254
255            dst  destination path on the remote host (remote is always self.host)
256
257            text src is text input, it must be stored into a temp file before uploading
258         """
259         # If source is a string input 
260         f = None
261         if text and not os.path.isfile(src):
262             # src is text input that should be uploaded as file
263             # create a temporal file with the content to upload
264             f = tempfile.NamedTemporaryFile(delete=False)
265             f.write(src)
266             f.close()
267             src = f.name
268
269         if not self.localhost:
270             # Build destination as <user>@<server>:<path>
271             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
272         result = self.copy(src, dst)
273
274         # clean up temp file
275         if f:
276             os.remove(f.name)
277
278         return result
279
280     def download(self, src, dst):
281         if not self.localhost:
282             # Build destination as <user>@<server>:<path>
283             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
284         return self.copy(src, dst)
285
286     def install_packages(self, packages, home):
287         command = ""
288         if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
289             command = rpmfuncs.install_packages_command(self.os, packages)
290         elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
291             command = debfuncs.install_packages_command(self.os, packages)
292         else:
293             msg = "Error installing packages ( OS not known ) "
294             self.error(msg, self.os)
295             raise RuntimeError, msg
296
297         out = err = ""
298         (out, err), proc = self.run_and_wait(command, home, 
299             shfile = "instpkg.sh",
300             pidfile = "instpkg_pidfile",
301             ecodefile = "instpkg_exitcode",
302             stdout = "instpkg_stdout", 
303             stderr = "instpkg_stderr",
304             raise_on_error = True)
305
306         return (out, err), proc 
307
308     def remove_packages(self, packages, home):
309         command = ""
310         if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
311             command = rpmfuncs.remove_packages_command(self.os, packages)
312         elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
313             command = debfuncs.remove_packages_command(self.os, packages)
314         else:
315             msg = "Error removing packages ( OS not known ) "
316             self.error(msg)
317             raise RuntimeError, msg
318
319         out = err = ""
320         (out, err), proc = self.run_and_wait(command, home, 
321             shfile = "rmpkg.sh",
322             pidfile = "rmpkg_pidfile",
323             ecodefile = "rmpkg_exitcode",
324             stdout = "rmpkg_stdout", 
325             stderr = "rmpkg_stderr",
326             raise_on_error = True)
327          
328         return (out, err), proc 
329
330     def mkdir(self, path, clean = False):
331         if clean:
332             self.rmdir(path)
333
334         return self.execute("mkdir -p %s" % path, with_lock = True)
335
336     def rmdir(self, path):
337         return self.execute("rm -rf %s" % path, with_lock = True)
338         
339     def run_and_wait(self, command, home, 
340             shfile = "cmd.sh",
341             env = None,
342             pidfile = "pidfile", 
343             ecodefile = "exitcode", 
344             stdin = None, 
345             stdout = "stdout", 
346             stderr = "stderr", 
347             sudo = False,
348             tty = False,
349             raise_on_error = False):
350         """ 
351         runs a command in background on the remote host, busy-waiting
352         until the command finishes execution.
353         This is more robust than doing a simple synchronized 'execute',
354         since in the remote host the command can continue to run detached
355         even if network disconnections occur
356         """
357         self.upload_command(command, home, 
358             shfile = shfile, 
359             ecodefile = ecodefile, 
360             env = env)
361
362         command = "bash ./%s" % shfile
363         # run command in background in remote host
364         (out, err), proc = self.run(command, home, 
365                 pidfile = pidfile,
366                 stdin = stdin, 
367                 stdout = stdout, 
368                 stderr = stderr, 
369                 sudo = sudo,
370                 tty = tty)
371
372         # check no errors occurred
373         if proc.poll():
374             msg = " Failed to run command '%s' " % command
375             self.error(msg, out, err)
376             if raise_on_error:
377                 raise RuntimeError, msg
378
379         # Wait for pid file to be generated
380         pid, ppid = self.wait_pid(
381                 home = home, 
382                 pidfile = pidfile, 
383                 raise_on_error = raise_on_error)
384
385         # wait until command finishes to execute
386         self.wait_run(pid, ppid)
387       
388         (out, err), proc = self.check_errors(home,
389             ecodefile = ecodefile,
390             stdout = stdout,
391             stderr= stderr)
392
393         # Out is what was written in the stderr file
394         if err:
395             msg = " Failed to run command '%s' " % command
396             self.error(msg, out, err)
397
398             if raise_on_error:
399                 raise RuntimeError, msg
400         
401         return (out, err), proc
402
403     def exitcode(self, home, ecodefile = "exitcode"):
404         """
405         Get the exit code of an application.
406         Returns an integer value with the exit code 
407         """
408         (out, err), proc = self.check_output(home, ecodefile)
409
410         # Succeeded to open file, return exit code in the file
411         if proc.wait() == 0:
412             try:
413                 return int(out.strip())
414             except:
415                 # Error in the content of the file!
416                 return ExitCode.CORRUPTFILE
417
418         # No such file or directory
419         if proc.returncode == 1:
420             return ExitCode.FILENOTFOUND
421         
422         # Other error from 'cat'
423         return ExitCode.ERROR
424
425     def upload_command(self, command, home, 
426             shfile = "cmd.sh",
427             ecodefile = "exitcode",
428             env = None):
429         """ Saves the command as a bash script file in the remote host, and
430         forces to save the exit code of the command execution to the ecodefile
431         """
432       
433         # The exit code of the command will be stored in ecodefile
434         command = " %(command)s ; echo $? > %(ecodefile)s ;" % {
435                 'command': command,
436                 'ecodefile': ecodefile,
437                 } 
438
439         # Export environment
440         environ = self.format_environment(env)
441
442         # Add environ to command
443         command = environ + command
444
445         dst = os.path.join(home, shfile)
446         return self.upload(command, dst, text = True)
447
448     def format_environment(self, env, inline = False):
449         """Format environmental variables for command to be executed either
450         as an inline command (i.e. PYTHONPATH=src/.. python script.py) or
451         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
452         """
453         sep = " " if inline else "\n"
454         export = " " if inline else "export"
455         return sep.join(map(lambda e: "%s %s" % (export, e),
456             env.strip().split(" "))) + sep if env else ""
457
458     def check_errors(self, home, 
459             ecodefile = "exitcode", 
460             stdout = "stdout",
461             stderr = "stderr"):
462         """
463         Checks whether errors occurred while running a command.
464         It first checks the exit code for the command, and only if the
465         exit code is an error one it returns the error output.
466
467         """
468         proc = None
469         err = ""
470         # retrive standard output from the file
471         (out, oerr), oproc = self.check_output(home, stdout)
472
473         # get exit code saved in the 'exitcode' file
474         ecode = self.exitcode(home, ecodefile)
475
476         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
477             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
478         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
479             # The process returned an error code or didn't exist. 
480             # Check standard error.
481             (err, eerr), proc = self.check_output(home, stderr)
482
483             # If the stderr file was not found, assume nothing bad happened,
484             # and just ignore the error.
485             # (cat returns 1 for error "No such file or directory")
486             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
487                 err = "" 
488             
489         return (out, err), proc
490  
491     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
492         """ Waits until the pid file for the command is generated, 
493             and returns the pid and ppid of the process """
494         pid = ppid = None
495         delay = 1.0
496
497         for i in xrange(4):
498             pidtuple = self.getpid(home = home, pidfile = pidfile)
499             
500             if pidtuple:
501                 pid, ppid = pidtuple
502                 break
503             else:
504                 time.sleep(delay)
505                 delay = delay * 1.5
506         else:
507             msg = " Failed to get pid for pidfile %s/%s " % (
508                     home, pidfile )
509             self.error(msg)
510             
511             if raise_on_error:
512                 raise RuntimeError, msg
513
514         return pid, ppid
515
516     def wait_run(self, pid, ppid, trial = 0):
517         """ wait for a remote process to finish execution """
518         start_delay = 1.0
519
520         while True:
521             status = self.status(pid, ppid)
522             
523             if status is ProcStatus.FINISHED:
524                 break
525             elif status is not ProcStatus.RUNNING:
526                 delay = delay * 1.5
527                 time.sleep(delay)
528                 # If it takes more than 20 seconds to start, then
529                 # asume something went wrong
530                 if delay > 20:
531                     break
532             else:
533                 # The app is running, just wait...
534                 time.sleep(0.5)
535
536     def check_output(self, home, filename):
537         """ Retrives content of file """
538         (out, err), proc = self.execute("cat %s" % 
539             os.path.join(home, filename), retry = 1, with_lock = True)
540         return (out, err), proc
541
542     def is_alive(self):
543         if self.localhost:
544             return True
545
546         out = err = ""
547         try:
548             # TODO: FIX NOT ALIVE!!!!
549             (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
550                     with_lock = True)
551         except:
552             import traceback
553             trace = traceback.format_exc()
554             msg = "Unresponsive host  %s " % err
555             self.error(msg, out, trace)
556             return False
557
558         if out.strip().startswith('ALIVE'):
559             return True
560         else:
561             msg = "Unresponsive host "
562             self.error(msg, out, err)
563             return False
564
565     def copy(self, src, dst):
566         if self.localhost:
567             (out, err), proc = execfuncs.lcopy(source, dest, 
568                     recursive = True,
569                     strict_host_checking = False)
570         else:
571             with self._lock:
572                 (out, err), proc = sshfuncs.rcopy(
573                     src, dst, 
574                     port = self.get("port"),
575                     identity = self.get("identity"),
576                     server_key = self.get("serverKey"),
577                     recursive = True,
578                     strict_host_checking = False)
579
580         return (out, err), proc
581
582     def execute(self, command,
583             sudo = False,
584             stdin = None, 
585             env = None,
586             tty = False,
587             forward_x11 = False,
588             timeout = None,
589             retry = 3,
590             err_on_timeout = True,
591             connect_timeout = 30,
592             strict_host_checking = False,
593             persistent = True,
594             blocking = True,
595             with_lock = False
596             ):
597         """ Notice that this invocation will block until the
598         execution finishes. If this is not the desired behavior,
599         use 'run' instead."""
600
601         if self.localhost:
602             (out, err), proc = execfuncs.lexec(command, 
603                     user = user,
604                     sudo = sudo,
605                     stdin = stdin,
606                     env = env)
607         else:
608             if with_lock:
609                 with self._lock:
610                     (out, err), proc = sshfuncs.rexec(
611                         command, 
612                         host = self.get("hostname"),
613                         user = self.get("username"),
614                         port = self.get("port"),
615                         agent = True,
616                         sudo = sudo,
617                         stdin = stdin,
618                         identity = self.get("identity"),
619                         server_key = self.get("serverKey"),
620                         env = env,
621                         tty = tty,
622                         forward_x11 = forward_x11,
623                         timeout = timeout,
624                         retry = retry,
625                         err_on_timeout = err_on_timeout,
626                         connect_timeout = connect_timeout,
627                         persistent = persistent,
628                         blocking = blocking, 
629                         strict_host_checking = strict_host_checking
630                         )
631             else:
632                 (out, err), proc = sshfuncs.rexec(
633                     command, 
634                     host = self.get("hostname"),
635                     user = self.get("username"),
636                     port = self.get("port"),
637                     agent = True,
638                     sudo = sudo,
639                     stdin = stdin,
640                     identity = self.get("identity"),
641                     server_key = self.get("serverKey"),
642                     env = env,
643                     tty = tty,
644                     forward_x11 = forward_x11,
645                     timeout = timeout,
646                     retry = retry,
647                     err_on_timeout = err_on_timeout,
648                     connect_timeout = connect_timeout,
649                     persistent = persistent,
650                     blocking = blocking, 
651                     strict_host_checking = strict_host_checking
652                     )
653
654         return (out, err), proc
655
656     def run(self, command, home,
657             create_home = False,
658             pidfile = 'pidfile',
659             stdin = None, 
660             stdout = 'stdout', 
661             stderr = 'stderr', 
662             sudo = False,
663             tty = False):
664         
665         self.debug("Running command '%s'" % command)
666         
667         if self.localhost:
668             (out, err), proc = execfuncs.lspawn(command, pidfile, 
669                     stdout = stdout, 
670                     stderr = stderr, 
671                     stdin = stdin, 
672                     home = home, 
673                     create_home = create_home, 
674                     sudo = sudo,
675                     user = user) 
676         else:
677             with self._lock:
678                 (out, err), proc = sshfuncs.rspawn(
679                     command,
680                     pidfile = pidfile,
681                     home = home,
682                     create_home = create_home,
683                     stdin = stdin if stdin is not None else '/dev/null',
684                     stdout = stdout if stdout else '/dev/null',
685                     stderr = stderr if stderr else '/dev/null',
686                     sudo = sudo,
687                     host = self.get("hostname"),
688                     user = self.get("username"),
689                     port = self.get("port"),
690                     agent = True,
691                     identity = self.get("identity"),
692                     server_key = self.get("serverKey"),
693                     tty = tty
694                     )
695
696         return (out, err), proc
697
698     def getpid(self, home, pidfile = "pidfile"):
699         if self.localhost:
700             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
701         else:
702             with self._lock:
703                 pidtuple = sshfuncs.rgetpid(
704                     os.path.join(home, pidfile),
705                     host = self.get("hostname"),
706                     user = self.get("username"),
707                     port = self.get("port"),
708                     agent = True,
709                     identity = self.get("identity"),
710                     server_key = self.get("serverKey")
711                     )
712         
713         return pidtuple
714
715     def status(self, pid, ppid):
716         if self.localhost:
717             status = execfuncs.lstatus(pid, ppid)
718         else:
719             with self._lock:
720                 status = sshfuncs.rstatus(
721                         pid, ppid,
722                         host = self.get("hostname"),
723                         user = self.get("username"),
724                         port = self.get("port"),
725                         agent = True,
726                         identity = self.get("identity"),
727                         server_key = self.get("serverKey")
728                         )
729            
730         return status
731     
732     def kill(self, pid, ppid, sudo = False):
733         out = err = ""
734         proc = None
735         status = self.status(pid, ppid)
736
737         if status == sshfuncs.ProcStatus.RUNNING:
738             if self.localhost:
739                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
740             else:
741                 with self._lock:
742                     (out, err), proc = sshfuncs.rkill(
743                         pid, ppid,
744                         host = self.get("hostname"),
745                         user = self.get("username"),
746                         port = self.get("port"),
747                         agent = True,
748                         sudo = sudo,
749                         identity = self.get("identity"),
750                         server_key = self.get("serverKey")
751                         )
752
753         return (out, err), proc
754