Improved LinuxApplication behavior
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33
34 # TODO: Verify files and dirs exists already
35 # TODO: Blacklist nodes!
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 reschedule_delay = "0.5s"
40
41 class ExitCode:
42     """
43     Error codes that the rexitcode function can return if unable to
44     check the exit code of a spawned process
45     """
46     FILENOTFOUND = -1
47     CORRUPTFILE = -2
48     ERROR = -3
49     OK = 0
50
51 class OSType:
52     """
53     Supported flavors of Linux OS
54     """
55     FEDORA_12 = "f12"
56     FEDORA_14 = "f14"
57     FEDORA = "fedora"
58     UBUNTU = "ubuntu"
59     DEBIAN = "debian"
60
61 @clsinit
62 class LinuxNode(ResourceManager):
63     _rtype = "LinuxNode"
64
65     @classmethod
66     def _register_attributes(cls):
67         hostname = Attribute("hostname", "Hostname of the machine",
68                 flags = Flags.ExecReadOnly)
69
70         username = Attribute("username", "Local account username", 
71                 flags = Flags.Credential)
72
73         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
74         
75         home = Attribute("home",
76                 "Experiment home directory to store all experiment related files",
77                 flags = Flags.ExecReadOnly)
78         
79         identity = Attribute("identity", "SSH identity file",
80                 flags = Flags.Credential)
81         
82         server_key = Attribute("serverKey", "Server public key", 
83                 flags = Flags.ExecReadOnly)
84         
85         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
86                 " from home folder before starting experiment", 
87                 flags = Flags.ExecReadOnly)
88         
89         clean_processes = Attribute("cleanProcesses", 
90                 "Kill all running processes before starting experiment",
91                 flags = Flags.ExecReadOnly)
92         
93         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
94                 "releasing the resource",
95                 flags = Flags.ExecReadOnly)
96
97         cls._register_attribute(hostname)
98         cls._register_attribute(username)
99         cls._register_attribute(port)
100         cls._register_attribute(home)
101         cls._register_attribute(identity)
102         cls._register_attribute(server_key)
103         cls._register_attribute(clean_home)
104         cls._register_attribute(clean_processes)
105         cls._register_attribute(tear_down)
106
107     def __init__(self, ec, guid):
108         super(LinuxNode, self).__init__(ec, guid)
109         self._os = None
110         
111         # lock to avoid concurrency issues on methods used by applications 
112         self._lock = threading.Lock()
113     
114     def log_message(self, msg):
115         return " guid %d - host %s - %s " % (self.guid, 
116                 self.get("hostname"), msg)
117
118     @property
119     def home(self):
120         return self.get("home") or ""
121
122     @property
123     def exp_home(self):
124         return os.path.join(self.home, self.ec.exp_id)
125
126     @property
127     def node_home(self):
128         node_home = "node-%d" % self.guid
129         return os.path.join(self.exp_home, node_home)
130
131     @property
132     def os(self):
133         if self._os:
134             return self._os
135
136         if (not self.get("hostname") or not self.get("username")):
137             msg = "Can't resolve OS, insufficient data "
138             self.error(msg)
139             raise RuntimeError, msg
140
141         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
142
143         if err and proc.poll():
144             msg = "Error detecting OS "
145             self.error(msg, out, err)
146             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
147
148         if out.find("Fedora release 12") == 0:
149             self._os = OSType.FEDORA_12
150         elif out.find("Fedora release 14") == 0:
151             self._os = OSType.FEDORA_14
152         elif out.find("Debian") == 0: 
153             self._os = OSType.DEBIAN
154         elif out.find("Ubuntu") ==0:
155             self._os = OSType.UBUNTU
156         else:
157             msg = "Unsupported OS"
158             self.error(msg, out)
159             raise RuntimeError, "%s - %s " %( msg, out )
160
161         return self._os
162
163     @property
164     def localhost(self):
165         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
166
167     def provision(self):
168         if not self.is_alive():
169             self._state = ResourceState.FAILED
170             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
171             self.error(msg)
172             raise RuntimeError, msg
173
174         if self.get("cleanProcesses"):
175             self.clean_processes()
176
177         if self.get("cleanHome"):
178             self.clean_home()
179        
180         self.mkdir(self.node_home)
181
182         super(LinuxNode, self).provision()
183
184     def deploy(self):
185         if self.state == ResourceState.NEW:
186             try:
187                 self.discover()
188                 self.provision()
189             except:
190                 self._state = ResourceState.FAILED
191                 raise
192
193         # Node needs to wait until all associated interfaces are 
194         # ready before it can finalize deployment
195         from nepi.resources.linux.interface import LinuxInterface
196         ifaces = self.get_connected(LinuxInterface.rtype())
197         for iface in ifaces:
198             if iface.state < ResourceState.READY:
199                 self.ec.schedule(reschedule_delay, self.deploy)
200                 return 
201
202         super(LinuxNode, self).deploy()
203
204     def release(self):
205         tear_down = self.get("tearDown")
206         if tear_down:
207             self.execute(tear_down)
208
209         super(LinuxNode, self).release()
210
211     def valid_connection(self, guid):
212         # TODO: Validate!
213         return True
214
215     def clean_processes(self, killer = False):
216         self.info("Cleaning up processes")
217         
218         if killer:
219             # Hardcore kill
220             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
221                 "sudo -S killall python tcpdump || /bin/true ; " +
222                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
223                 "sudo -S killall -u root || /bin/true ; " +
224                 "sudo -S killall -u root || /bin/true ; ")
225         else:
226             # Be gentler...
227             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
228                 "sudo -S killall tcpdump || /bin/true ; " +
229                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
230                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
231
232         out = err = ""
233         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
234             
235     def clean_home(self):
236         self.info("Cleaning up home")
237         
238         cmd = (
239             # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
240             "find . -maxdepth 1 -name 'nepi-*' " +
241             " -execdir rm -rf {} + "
242             )
243             
244         if self.home:
245             cmd = "cd %s ; " % self.home + cmd
246
247         out = err = ""
248         (out, err), proc = self.execute(cmd, with_lock = True)
249
250     def upload(self, src, dst, text = False):
251         """ Copy content to destination
252
253            src  content to copy. Can be a local file, directory or a list of files
254
255            dst  destination path on the remote host (remote is always self.host)
256
257            text src is text input, it must be stored into a temp file before uploading
258         """
259         # If source is a string input 
260         f = None
261         if text and not os.path.isfile(src):
262             # src is text input that should be uploaded as file
263             # create a temporal file with the content to upload
264             f = tempfile.NamedTemporaryFile(delete=False)
265             f.write(src)
266             f.close()
267             src = f.name
268
269         if not self.localhost:
270             # Build destination as <user>@<server>:<path>
271             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
272
273         result = self.copy(src, dst)
274
275         # clean up temp file
276         if f:
277             os.remove(f.name)
278
279         return result
280
281     def download(self, src, dst):
282         if not self.localhost:
283             # Build destination as <user>@<server>:<path>
284             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
285         return self.copy(src, dst)
286
287     def install_packages(self, packages, home):
288         command = ""
289         if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
290             command = rpmfuncs.install_packages_command(self.os, packages)
291         elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
292             command = debfuncs.install_packages_command(self.os, packages)
293         else:
294             msg = "Error installing packages ( OS not known ) "
295             self.error(msg, self.os)
296             raise RuntimeError, msg
297
298         out = err = ""
299         (out, err), proc = self.run_and_wait(command, home, 
300             shfile = "instpkg.sh",
301             pidfile = "instpkg_pidfile",
302             ecodefile = "instpkg_exitcode",
303             stdout = "instpkg_stdout", 
304             stderr = "instpkg_stderr",
305             raise_on_error = True)
306
307         return (out, err), proc 
308
309     def remove_packages(self, packages, home):
310         command = ""
311         if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
312             command = rpmfuncs.remove_packages_command(self.os, packages)
313         elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
314             command = debfuncs.remove_packages_command(self.os, packages)
315         else:
316             msg = "Error removing packages ( OS not known ) "
317             self.error(msg)
318             raise RuntimeError, msg
319
320         out = err = ""
321         (out, err), proc = self.run_and_wait(command, home, 
322             shfile = "rmpkg.sh",
323             pidfile = "rmpkg_pidfile",
324             ecodefile = "rmpkg_exitcode",
325             stdout = "rmpkg_stdout", 
326             stderr = "rmpkg_stderr",
327             raise_on_error = True)
328          
329         return (out, err), proc 
330
331     def mkdir(self, path, clean = False):
332         if clean:
333             self.rmdir(path)
334
335         return self.execute("mkdir -p %s" % path, with_lock = True)
336
337     def rmdir(self, path):
338         return self.execute("rm -rf %s" % path, with_lock = True)
339         
340     def run_and_wait(self, command, home, 
341             shfile = "cmd.sh",
342             env = None,
343             pidfile = "pidfile", 
344             ecodefile = "exitcode", 
345             stdin = None, 
346             stdout = "stdout", 
347             stderr = "stderr", 
348             sudo = False,
349             tty = False,
350             raise_on_error = False):
351         """ 
352         runs a command in background on the remote host, busy-waiting
353         until the command finishes execution.
354         This is more robust than doing a simple synchronized 'execute',
355         since in the remote host the command can continue to run detached
356         even if network disconnections occur
357         """
358         self.upload_command(command, home, 
359             shfile = shfile, 
360             ecodefile = ecodefile, 
361             env = env)
362
363         command = "bash ./%s" % shfile
364         # run command in background in remote host
365         (out, err), proc = self.run(command, home, 
366                 pidfile = pidfile,
367                 stdin = stdin, 
368                 stdout = stdout, 
369                 stderr = stderr, 
370                 sudo = sudo,
371                 tty = tty)
372
373         # check no errors occurred
374         if proc.poll() and err:
375             msg = " Failed to run command '%s' " % command
376             self.error(msg, out, err)
377             if raise_on_error:
378                 raise RuntimeError, msg
379
380         # Wait for pid file to be generated
381         pid, ppid = self.wait_pid(
382                 home = home, 
383                 pidfile = pidfile, 
384                 raise_on_error = raise_on_error)
385
386         # wait until command finishes to execute
387         self.wait_run(pid, ppid)
388       
389         (out, err), proc = self.check_errors(home, ecodefile, stderr)
390
391         # Out is what was written in the stderr file
392         if out or err:
393             msg = " Failed to run command '%s' " % command
394             self.error(msg, out, err)
395
396             if raise_on_error:
397                 raise RuntimeError, msg
398         
399         return (out, err), proc
400
401     def exitcode(self, home, ecodefile = "exitcode"):
402         """
403         Get the exit code of an application.
404         Returns an integer value with the exit code 
405         """
406         (out, err), proc = self.check_output(home, ecodefile)
407
408         # Succeeded to open file, return exit code in the file
409         if proc.wait() == 0:
410             try:
411                 return int(out.strip())
412             except:
413                 # Error in the content of the file!
414                 return ExitCode.CORRUPTFILE
415
416         # No such file or directory
417         if proc.returncode == 1:
418             return ExitCode.FILENOTFOUND
419         
420         # Other error from 'cat'
421         return ExitCode.ERROR
422
423     def upload_command(self, command, home, 
424             shfile = "cmd.sh",
425             ecodefile = "exitcode",
426             env = None):
427         """ Saves the command as a bash script file in the remote host, and
428         forces to save the exit code of the command execution to the ecodefile
429         """
430       
431         # Prepare command to be executed as a bash script file
432         # Make sure command ends in ';' so the curly brackets syntax is correct
433         if not command.strip()[-1] == ';':
434             command += " ; "
435
436         # The exit code of the command will be stored in ecodefile
437         command = " { { %(command)s } ; echo $? > %(ecodefile)s ; } " % {
438                 'command': command,
439                 'ecodefile': ecodefile,
440                 } 
441
442         # Export environment
443         environ = "\n".join(map(lambda e: "export %s" % e, env.split(" "))) \
444             if env else ""
445
446         # Add environ to command
447         command = environ + command
448
449         dst = os.path.join(home, shfile)
450         return self.upload(command, dst, text = True)
451
452     def check_errors(self, home, 
453             ecodefile = "exitcode", 
454             stderr = "stderr"):
455         """
456         Checks whether errors occurred while running a command.
457         It first checks the exit code for the command, and only if the
458         exit code is an error one it returns the error output.
459         """
460         out = err = ""
461         proc = None
462
463         # get Exit code
464         ecode = self.exitcode(home, ecodefile)
465
466         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
467             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
468         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
469             # The process returned an error code or didn't exist. 
470             # Check standard error.
471             (out, err), proc = self.check_output(home, stderr)
472             
473             # If the stderr file was not found, assume nothing happened.
474             # We just ignore the error.
475             # (cat returns 1 for error "No such file or directory")
476             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
477                 out = err = ""
478        
479         return (out, err), proc
480  
481     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
482         """ Waits until the pid file for the command is generated, 
483             and returns the pid and ppid of the process """
484         pid = ppid = None
485         delay = 1.0
486
487         for i in xrange(4):
488             pidtuple = self.getpid(home = home, pidfile = pidfile)
489             
490             if pidtuple:
491                 pid, ppid = pidtuple
492                 break
493             else:
494                 time.sleep(delay)
495                 delay = delay * 1.5
496         else:
497             msg = " Failed to get pid for pidfile %s/%s " % (
498                     home, pidfile )
499             self.error(msg)
500             
501             if raise_on_error:
502                 raise RuntimeError, msg
503
504         return pid, ppid
505
506     def wait_run(self, pid, ppid, trial = 0):
507         """ wait for a remote process to finish execution """
508         start_delay = 1.0
509
510         while True:
511             status = self.status(pid, ppid)
512             
513             if status is ProcStatus.FINISHED:
514                 break
515             elif status is not ProcStatus.RUNNING:
516                 delay = delay * 1.5
517                 time.sleep(delay)
518                 # If it takes more than 20 seconds to start, then
519                 # asume something went wrong
520                 if delay > 20:
521                     break
522             else:
523                 # The app is running, just wait...
524                 time.sleep(0.5)
525
526     def check_output(self, home, filename):
527         """ Retrives content of file """
528         (out, err), proc = self.execute("cat %s" % 
529             os.path.join(home, filename), retry = 1, with_lock = True)
530         return (out, err), proc
531
532     def is_alive(self):
533         if self.localhost:
534             return True
535
536         out = err = ""
537         try:
538             # TODO: FIX NOT ALIVE!!!!
539             (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
540                     with_lock = True)
541         except:
542             import traceback
543             trace = traceback.format_exc()
544             msg = "Unresponsive host  %s " % err
545             self.error(msg, out, trace)
546             return False
547
548         if out.strip().startswith('ALIVE'):
549             return True
550         else:
551             msg = "Unresponsive host "
552             self.error(msg, out, err)
553             return False
554
555     def copy(self, src, dst):
556         if self.localhost:
557             (out, err), proc = execfuncs.lcopy(source, dest, 
558                     recursive = True,
559                     strict_host_checking = False)
560         else:
561             with self._lock:
562                 (out, err), proc = sshfuncs.rcopy(
563                     src, dst, 
564                     port = self.get("port"),
565                     identity = self.get("identity"),
566                     server_key = self.get("serverKey"),
567                     recursive = True,
568                     strict_host_checking = False)
569
570         return (out, err), proc
571
572     def execute(self, command,
573             sudo = False,
574             stdin = None, 
575             env = None,
576             tty = False,
577             forward_x11 = False,
578             timeout = None,
579             retry = 3,
580             err_on_timeout = True,
581             connect_timeout = 30,
582             strict_host_checking = False,
583             persistent = True,
584             blocking = True,
585             with_lock = False
586             ):
587         """ Notice that this invocation will block until the
588         execution finishes. If this is not the desired behavior,
589         use 'run' instead."""
590
591         if self.localhost:
592             (out, err), proc = execfuncs.lexec(command, 
593                     user = user,
594                     sudo = sudo,
595                     stdin = stdin,
596                     env = env)
597         else:
598             if with_lock:
599                 with self._lock:
600                     (out, err), proc = sshfuncs.rexec(
601                         command, 
602                         host = self.get("hostname"),
603                         user = self.get("username"),
604                         port = self.get("port"),
605                         agent = True,
606                         sudo = sudo,
607                         stdin = stdin,
608                         identity = self.get("identity"),
609                         server_key = self.get("serverKey"),
610                         env = env,
611                         tty = tty,
612                         forward_x11 = forward_x11,
613                         timeout = timeout,
614                         retry = retry,
615                         err_on_timeout = err_on_timeout,
616                         connect_timeout = connect_timeout,
617                         persistent = persistent,
618                         blocking = blocking, 
619                         strict_host_checking = strict_host_checking
620                         )
621             else:
622                 (out, err), proc = sshfuncs.rexec(
623                     command, 
624                     host = self.get("hostname"),
625                     user = self.get("username"),
626                     port = self.get("port"),
627                     agent = True,
628                     sudo = sudo,
629                     stdin = stdin,
630                     identity = self.get("identity"),
631                     server_key = self.get("serverKey"),
632                     env = env,
633                     tty = tty,
634                     forward_x11 = forward_x11,
635                     timeout = timeout,
636                     retry = retry,
637                     err_on_timeout = err_on_timeout,
638                     connect_timeout = connect_timeout,
639                     persistent = persistent,
640                     blocking = blocking, 
641                     strict_host_checking = strict_host_checking
642                     )
643
644         return (out, err), proc
645
646     def run(self, command, home,
647             create_home = False,
648             pidfile = 'pidfile',
649             stdin = None, 
650             stdout = 'stdout', 
651             stderr = 'stderr', 
652             sudo = False,
653             tty = False):
654         
655         self.debug("Running command '%s'" % command)
656         
657         if self.localhost:
658             (out, err), proc = execfuncs.lspawn(command, pidfile, 
659                     stdout = stdout, 
660                     stderr = stderr, 
661                     stdin = stdin, 
662                     home = home, 
663                     create_home = create_home, 
664                     sudo = sudo,
665                     user = user) 
666         else:
667             with self._lock:
668                 (out, err), proc = sshfuncs.rspawn(
669                     command,
670                     pidfile = pidfile,
671                     home = home,
672                     create_home = create_home,
673                     stdin = stdin if stdin is not None else '/dev/null',
674                     stdout = stdout if stdout else '/dev/null',
675                     stderr = stderr if stderr else '/dev/null',
676                     sudo = sudo,
677                     host = self.get("hostname"),
678                     user = self.get("username"),
679                     port = self.get("port"),
680                     agent = True,
681                     identity = self.get("identity"),
682                     server_key = self.get("serverKey"),
683                     tty = tty
684                     )
685
686         return (out, err), proc
687
688     def getpid(self, home, pidfile = "pidfile"):
689         if self.localhost:
690             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
691         else:
692             with self._lock:
693                 pidtuple = sshfuncs.rgetpid(
694                     os.path.join(home, pidfile),
695                     host = self.get("hostname"),
696                     user = self.get("username"),
697                     port = self.get("port"),
698                     agent = True,
699                     identity = self.get("identity"),
700                     server_key = self.get("serverKey")
701                     )
702         
703         return pidtuple
704
705     def status(self, pid, ppid):
706         if self.localhost:
707             status = execfuncs.lstatus(pid, ppid)
708         else:
709             with self._lock:
710                 status = sshfuncs.rstatus(
711                         pid, ppid,
712                         host = self.get("hostname"),
713                         user = self.get("username"),
714                         port = self.get("port"),
715                         agent = True,
716                         identity = self.get("identity"),
717                         server_key = self.get("serverKey")
718                         )
719            
720         return status
721     
722     def kill(self, pid, ppid, sudo = False):
723         out = err = ""
724         proc = None
725         status = self.status(pid, ppid)
726
727         if status == sshfuncs.ProcStatus.RUNNING:
728             if self.localhost:
729                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
730             else:
731                 with self._lock:
732                     (out, err), proc = sshfuncs.rkill(
733                         pid, ppid,
734                         host = self.get("hostname"),
735                         user = self.get("username"),
736                         port = self.get("port"),
737                         agent = True,
738                         sudo = sudo,
739                         identity = self.get("identity"),
740                         server_key = self.get("serverKey")
741                         )
742
743         return (out, err), proc
744