Bugfixes for LinuxApplication and LinuxNode
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33
34 # TODO: Verify files and dirs exists already
35 # TODO: Blacklist nodes!
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 reschedule_delay = "0.5s"
40
41 class ExitCode:
42     """
43     Error codes that the rexitcode function can return if unable to
44     check the exit code of a spawned process
45     """
46     FILENOTFOUND = -1
47     CORRUPTFILE = -2
48     ERROR = -3
49     OK = 0
50
51 class OSType:
52     """
53     Supported flavors of Linux OS
54     """
55     FEDORA_12 = "f12"
56     FEDORA_14 = "f14"
57     FEDORA = "fedora"
58     UBUNTU = "ubuntu"
59     DEBIAN = "debian"
60
61 @clsinit
62 class LinuxNode(ResourceManager):
63     _rtype = "LinuxNode"
64
65     @classmethod
66     def _register_attributes(cls):
67         hostname = Attribute("hostname", "Hostname of the machine",
68                 flags = Flags.ExecReadOnly)
69
70         username = Attribute("username", "Local account username", 
71                 flags = Flags.Credential)
72
73         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
74         
75         home = Attribute("home",
76                 "Experiment home directory to store all experiment related files",
77                 flags = Flags.ExecReadOnly)
78         
79         identity = Attribute("identity", "SSH identity file",
80                 flags = Flags.Credential)
81         
82         server_key = Attribute("serverKey", "Server public key", 
83                 flags = Flags.ExecReadOnly)
84         
85         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
86                 " from home folder before starting experiment", 
87                 flags = Flags.ExecReadOnly)
88         
89         clean_processes = Attribute("cleanProcesses", 
90                 "Kill all running processes before starting experiment",
91                 flags = Flags.ExecReadOnly)
92         
93         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
94                 "releasing the resource",
95                 flags = Flags.ExecReadOnly)
96
97         cls._register_attribute(hostname)
98         cls._register_attribute(username)
99         cls._register_attribute(port)
100         cls._register_attribute(home)
101         cls._register_attribute(identity)
102         cls._register_attribute(server_key)
103         cls._register_attribute(clean_home)
104         cls._register_attribute(clean_processes)
105         cls._register_attribute(tear_down)
106
107     def __init__(self, ec, guid):
108         super(LinuxNode, self).__init__(ec, guid)
109         self._os = None
110         
111         # lock to avoid concurrency issues on methods used by applications 
112         self._lock = threading.Lock()
113     
114     def log_message(self, msg):
115         return " guid %d - host %s - %s " % (self.guid, 
116                 self.get("hostname"), msg)
117
118     @property
119     def home(self):
120         return self.get("home") or ""
121
122     @property
123     def exp_home(self):
124         return os.path.join(self.home, self.ec.exp_id)
125
126     @property
127     def node_home(self):
128         node_home = "node-%d" % self.guid
129         return os.path.join(self.exp_home, node_home)
130
131     @property
132     def os(self):
133         if self._os:
134             return self._os
135
136         if (not self.get("hostname") or not self.get("username")):
137             msg = "Can't resolve OS, insufficient data "
138             self.error(msg)
139             raise RuntimeError, msg
140
141         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
142
143         if err and proc.poll():
144             msg = "Error detecting OS "
145             self.error(msg, out, err)
146             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
147
148         if out.find("Fedora release 12") == 0:
149             self._os = OSType.FEDORA_12
150         elif out.find("Fedora release 14") == 0:
151             self._os = OSType.FEDORA_14
152         elif out.find("Debian") == 0: 
153             self._os = OSType.DEBIAN
154         elif out.find("Ubuntu") ==0:
155             self._os = OSType.UBUNTU
156         else:
157             msg = "Unsupported OS"
158             self.error(msg, out)
159             raise RuntimeError, "%s - %s " %( msg, out )
160
161         return self._os
162
163     @property
164     def localhost(self):
165         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
166
167     def provision(self):
168         if not self.is_alive():
169             self._state = ResourceState.FAILED
170             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
171             self.error(msg)
172             raise RuntimeError, msg
173
174         if self.get("cleanProcesses"):
175             self.clean_processes()
176
177         if self.get("cleanHome"):
178             self.clean_home()
179        
180         self.mkdir(self.node_home)
181
182         super(LinuxNode, self).provision()
183
184     def deploy(self):
185         if self.state == ResourceState.NEW:
186             try:
187                self.discover()
188                self.provision()
189             except:
190                 self._state = ResourceState.FAILED
191                 raise
192
193         # Node needs to wait until all associated interfaces are 
194         # ready before it can finalize deployment
195         from nepi.resources.linux.interface import LinuxInterface
196         ifaces = self.get_connected(LinuxInterface.rtype())
197         for iface in ifaces:
198             if iface.state < ResourceState.READY:
199                 self.ec.schedule(reschedule_delay, self.deploy)
200                 return 
201
202         super(LinuxNode, self).deploy()
203
204     def release(self):
205         tear_down = self.get("tearDown")
206         if tear_down:
207             self.execute(tear_down)
208
209         super(LinuxNode, self).release()
210
211     def valid_connection(self, guid):
212         # TODO: Validate!
213         return True
214
215     def clean_processes(self, killer = False):
216         self.info("Cleaning up processes")
217         
218         if killer:
219             # Hardcore kill
220             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
221                 "sudo -S killall python tcpdump || /bin/true ; " +
222                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
223                 "sudo -S killall -u root || /bin/true ; " +
224                 "sudo -S killall -u root || /bin/true ; ")
225         else:
226             # Be gentler...
227             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
228                 "sudo -S killall tcpdump || /bin/true ; " +
229                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
230                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
231
232         out = err = ""
233         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
234             
235     def clean_home(self):
236         self.info("Cleaning up home")
237         
238         cmd = (
239             # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
240             "find . -maxdepth 1 -name 'nepi-*' " +
241             " -execdir rm -rf {} + "
242             )
243             
244         if self.home:
245             cmd = "cd %s ; " % self.home + cmd
246
247         out = err = ""
248         (out, err), proc = self.execute(cmd, with_lock = True)
249
250     def upload(self, src, dst, text = False):
251         """ Copy content to destination
252
253            src  content to copy. Can be a local file, directory or a list of files
254
255            dst  destination path on the remote host (remote is always self.host)
256
257            text src is text input, it must be stored into a temp file before uploading
258         """
259         # If source is a string input 
260         f = None
261         if text and not os.path.isfile(src):
262             # src is text input that should be uploaded as file
263             # create a temporal file with the content to upload
264             f = tempfile.NamedTemporaryFile(delete=False)
265             f.write(src)
266             f.close()
267             src = f.name
268
269         if not self.localhost:
270             # Build destination as <user>@<server>:<path>
271             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
272
273         result = self.copy(src, dst)
274
275         # clean up temp file
276         if f:
277             os.remove(f.name)
278
279         return result
280
281     def download(self, src, dst):
282         if not self.localhost:
283             # Build destination as <user>@<server>:<path>
284             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
285         return self.copy(src, dst)
286
287     def install_packages(self, packages, home):
288         command = ""
289         if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
290             command = rpmfuncs.install_packages_command(self.os, packages)
291         elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
292             command = debfuncs.install_packages_command(self.os, packages)
293         else:
294             msg = "Error installing packages ( OS not known ) "
295             self.error(msg, self.os)
296             raise RuntimeError, msg
297
298         out = err = ""
299         (out, err), proc = self.run_and_wait(command, home, 
300             shfile = "instpkg.sh",
301             pidfile = "instpkg_pidfile",
302             ecodefile = "instpkg_exitcode",
303             stdout = "instpkg_stdout", 
304             stderr = "instpkg_stderr",
305             raise_on_error = True)
306
307         return (out, err), proc 
308
309     def remove_packages(self, packages, home):
310         command = ""
311         if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
312             command = rpmfuncs.remove_packages_command(self.os, packages)
313         elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
314             command = debfuncs.remove_packages_command(self.os, packages)
315         else:
316             msg = "Error removing packages ( OS not known ) "
317             self.error(msg)
318             raise RuntimeError, msg
319
320         out = err = ""
321         (out, err), proc = self.run_and_wait(command, home, 
322             shfile = "rmpkg.sh",
323             pidfile = "rmpkg_pidfile",
324             ecodefile = "rmpkg_exitcode",
325             stdout = "rmpkg_stdout", 
326             stderr = "rmpkg_stderr",
327             raise_on_error = True)
328          
329         return (out, err), proc 
330
331     def mkdir(self, path, clean = False):
332         if clean:
333             self.rmdir(path)
334
335         return self.execute("mkdir -p %s" % path, with_lock = True)
336
337     def rmdir(self, path):
338         return self.execute("rm -rf %s" % path, with_lock = True)
339         
340     def run_and_wait(self, command, home, 
341             shfile = "cmd.sh",
342             pidfile = "pidfile", 
343             ecodefile = "exitcode", 
344             stdin = None, 
345             stdout = "stdout", 
346             stderr = "stderr", 
347             sudo = False,
348             tty = False,
349             raise_on_error = False):
350         """ 
351         runs a command in background on the remote host, busy-waiting
352         until the command finishes execution.
353         This is more robust than doing a simple synchronized 'execute',
354         since in the remote host the command can continue to run detached
355         even if network disconnections occur
356         """
357         self.upload_command(command, home, shfile, ecodefile)
358
359         command = "bash ./%s" % shfile
360         # run command in background in remote host
361         (out, err), proc = self.run(command, home, 
362                 pidfile = pidfile,
363                 stdin = stdin, 
364                 stdout = stdout, 
365                 stderr = stderr, 
366                 sudo = sudo,
367                 tty = tty)
368
369         # check no errors occurred
370         if proc.poll() and err:
371             msg = " Failed to run command '%s' " % command
372             self.error(msg, out, err)
373             if raise_on_error:
374                 raise RuntimeError, msg
375
376         # Wait for pid file to be generated
377         pid, ppid = self.wait_pid(
378                 home = home, 
379                 pidfile = pidfile, 
380                 raise_on_error = raise_on_error)
381
382         # wait until command finishes to execute
383         self.wait_run(pid, ppid)
384       
385         (out, err), proc = self.check_errors(home, ecodefile, stderr)
386
387         # Out is what was written in the stderr file
388         if out or err:
389             msg = " Failed to run command '%s' " % command
390             self.error(msg, out, err)
391
392             if raise_on_error:
393                 raise RuntimeError, msg
394         
395         return (out, err), proc
396
397     def exitcode(self, home, ecodefile = "exitcode"):
398         """
399         Get the exit code of an application.
400         Returns an integer value with the exit code 
401         """
402         (out, err), proc = self.check_output(home, ecodefile)
403
404         # Succeeded to open file, return exit code in the file
405         if proc.wait() == 0:
406             try:
407                 return int(out.strip())
408             except:
409                 # Error in the content of the file!
410                 return ExitCode.CORRUPTFILE
411
412         # No such file or directory
413         if proc.returncode == 1:
414             return ExitCode.FILENOTFOUND
415         
416         # Other error from 'cat'
417         return ExitCode.ERROR
418
419     def upload_command(self, command, home, 
420             shfile = "cmd.sh",
421             ecodefile = "exitcode",
422             env = None):
423
424         command = " ( %(command)s ) ; echo $? > %(ecodefile)s " % {
425                 'command': command,
426                 'ecodefile': ecodefile,
427                 } 
428
429         # Export environment
430         environ = ""
431         if env:
432             for var in env.split(" "):
433                 environ += 'export %s\n' % var
434
435         command = environ + command
436
437         dst = os.path.join(home, shfile)
438         return self.upload(command, dst, text = True)
439
440     def check_errors(self, home, 
441             ecodefile = "exitcode", 
442             stderr = "stderr"):
443         """
444         Checks whether errors occurred while running a command.
445         It first checks the exit code for the command, and only if the
446         exit code is an error one it returns the error output.
447         """
448         out = err = ""
449         proc = None
450
451         # get Exit code
452         ecode = self.exitcode(home, ecodefile)
453
454         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
455             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
456         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
457             # The process returned an error code or didn't exist. 
458             # Check standard error.
459             (out, err), proc = self.check_output(home, stderr)
460             
461             # If the stderr file was not found, assume nothing happened.
462             # We just ignore the error.
463             # (cat returns 1 for error "No such file or directory")
464             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
465                 out = err = ""
466        
467         return (out, err), proc
468  
469     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
470         """ Waits until the pid file for the command is generated, 
471             and returns the pid and ppid of the process """
472         pid = ppid = None
473         delay = 1.0
474
475         for i in xrange(4):
476             pidtuple = self.getpid(home = home, pidfile = pidfile)
477             
478             if pidtuple:
479                 pid, ppid = pidtuple
480                 break
481             else:
482                 time.sleep(delay)
483                 delay = delay * 1.5
484         else:
485             msg = " Failed to get pid for pidfile %s/%s " % (
486                     home, pidfile )
487             self.error(msg)
488             
489             if raise_on_error:
490                 raise RuntimeError, msg
491
492         return pid, ppid
493
494     def wait_run(self, pid, ppid, trial = 0):
495         """ wait for a remote process to finish execution """
496         start_delay = 1.0
497
498         while True:
499             status = self.status(pid, ppid)
500             
501             if status is ProcStatus.FINISHED:
502                 break
503             elif status is not ProcStatus.RUNNING:
504                 delay = delay * 1.5
505                 time.sleep(delay)
506                 # If it takes more than 20 seconds to start, then
507                 # asume something went wrong
508                 if delay > 20:
509                     break
510             else:
511                 # The app is running, just wait...
512                 time.sleep(0.5)
513
514     def check_output(self, home, filename):
515         """ Retrives content of file """
516         (out, err), proc = self.execute("cat %s" % 
517             os.path.join(home, filename), retry = 1, with_lock = True)
518         return (out, err), proc
519
520     def is_alive(self):
521         if self.localhost:
522             return True
523
524         out = err = ""
525         try:
526             # TODO: FIX NOT ALIVE!!!!
527             (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
528                     with_lock = True)
529         except:
530             import traceback
531             trace = traceback.format_exc()
532             msg = "Unresponsive host  %s " % err
533             self.error(msg, out, trace)
534             return False
535
536         if out.strip().startswith('ALIVE'):
537             return True
538         else:
539             msg = "Unresponsive host "
540             self.error(msg, out, err)
541             return False
542
543     def copy(self, src, dst):
544         if self.localhost:
545             (out, err), proc = execfuncs.lcopy(source, dest, 
546                     recursive = True,
547                     strict_host_checking = False)
548         else:
549             with self._lock:
550                 (out, err), proc = sshfuncs.rcopy(
551                     src, dst, 
552                     port = self.get("port"),
553                     identity = self.get("identity"),
554                     server_key = self.get("serverKey"),
555                     recursive = True,
556                     strict_host_checking = False)
557
558         return (out, err), proc
559
560     def execute(self, command,
561             sudo = False,
562             stdin = None, 
563             env = None,
564             tty = False,
565             forward_x11 = False,
566             timeout = None,
567             retry = 3,
568             err_on_timeout = True,
569             connect_timeout = 30,
570             strict_host_checking = False,
571             persistent = True,
572             with_lock = False
573             ):
574         """ Notice that this invocation will block until the
575         execution finishes. If this is not the desired behavior,
576         use 'run' instead."""
577
578         if self.localhost:
579             (out, err), proc = execfuncs.lexec(command, 
580                     user = user,
581                     sudo = sudo,
582                     stdin = stdin,
583                     env = env)
584         else:
585             if with_lock:
586                 with self._lock:
587                     (out, err), proc = sshfuncs.rexec(
588                         command, 
589                         host = self.get("hostname"),
590                         user = self.get("username"),
591                         port = self.get("port"),
592                         agent = True,
593                         sudo = sudo,
594                         stdin = stdin,
595                         identity = self.get("identity"),
596                         server_key = self.get("serverKey"),
597                         env = env,
598                         tty = tty,
599                         forward_x11 = forward_x11,
600                         timeout = timeout,
601                         retry = retry,
602                         err_on_timeout = err_on_timeout,
603                         connect_timeout = connect_timeout,
604                         persistent = persistent,
605                         strict_host_checking = strict_host_checking
606                         )
607             else:
608                 (out, err), proc = sshfuncs.rexec(
609                     command, 
610                     host = self.get("hostname"),
611                     user = self.get("username"),
612                     port = self.get("port"),
613                     agent = True,
614                     sudo = sudo,
615                     stdin = stdin,
616                     identity = self.get("identity"),
617                     server_key = self.get("serverKey"),
618                     env = env,
619                     tty = tty,
620                     forward_x11 = forward_x11,
621                     timeout = timeout,
622                     retry = retry,
623                     err_on_timeout = err_on_timeout,
624                     connect_timeout = connect_timeout,
625                     persistent = persistent
626                     )
627
628         return (out, err), proc
629
630     def run(self, command, home,
631             create_home = False,
632             pidfile = 'pidfile',
633             stdin = None, 
634             stdout = 'stdout', 
635             stderr = 'stderr', 
636             sudo = False,
637             tty = False):
638         
639         self.debug("Running command '%s'" % command)
640         
641         if self.localhost:
642             (out, err), proc = execfuncs.lspawn(command, pidfile, 
643                     stdout = stdout, 
644                     stderr = stderr, 
645                     stdin = stdin, 
646                     home = home, 
647                     create_home = create_home, 
648                     sudo = sudo,
649                     user = user) 
650         else:
651             with self._lock:
652                 (out, err), proc = sshfuncs.rspawn(
653                     command,
654                     pidfile = pidfile,
655                     home = home,
656                     create_home = create_home,
657                     stdin = stdin if stdin is not None else '/dev/null',
658                     stdout = stdout if stdout else '/dev/null',
659                     stderr = stderr if stderr else '/dev/null',
660                     sudo = sudo,
661                     host = self.get("hostname"),
662                     user = self.get("username"),
663                     port = self.get("port"),
664                     agent = True,
665                     identity = self.get("identity"),
666                     server_key = self.get("serverKey"),
667                     tty = tty
668                     )
669
670         return (out, err), proc
671
672     def getpid(self, home, pidfile = "pidfile"):
673         if self.localhost:
674             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
675         else:
676             with self._lock:
677                 pidtuple = sshfuncs.rgetpid(
678                     os.path.join(home, pidfile),
679                     host = self.get("hostname"),
680                     user = self.get("username"),
681                     port = self.get("port"),
682                     agent = True,
683                     identity = self.get("identity"),
684                     server_key = self.get("serverKey")
685                     )
686         
687         return pidtuple
688
689     def status(self, pid, ppid):
690         if self.localhost:
691             status = execfuncs.lstatus(pid, ppid)
692         else:
693             with self._lock:
694                 status = sshfuncs.rstatus(
695                         pid, ppid,
696                         host = self.get("hostname"),
697                         user = self.get("username"),
698                         port = self.get("port"),
699                         agent = True,
700                         identity = self.get("identity"),
701                         server_key = self.get("serverKey")
702                         )
703            
704         return status
705     
706     def kill(self, pid, ppid, sudo = False):
707         out = err = ""
708         proc = None
709         status = self.status(pid, ppid)
710
711         if status == sshfuncs.ProcStatus.RUNNING:
712             if self.localhost:
713                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
714             else:
715                 with self._lock:
716                     (out, err), proc = sshfuncs.rkill(
717                         pid, ppid,
718                         host = self.get("hostname"),
719                         user = self.get("username"),
720                         port = self.get("port"),
721                         agent = True,
722                         sudo = sudo,
723                         identity = self.get("identity"),
724                         server_key = self.get("serverKey")
725                         )
726
727         return (out, err), proc
728