Bugfixing LinuxNode and LinuxApplication
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33
34 # TODO: Verify files and dirs exists already
35 # TODO: Blacklist nodes!
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 reschedule_delay = "0.5s"
40
41 class ExitCode:
42     """
43     Error codes that the rexitcode function can return if unable to
44     check the exit code of a spawned process
45     """
46     FILENOTFOUND = -1
47     CORRUPTFILE = -2
48     ERROR = -3
49     OK = 0
50
51 @clsinit
52 class LinuxNode(ResourceManager):
53     _rtype = "LinuxNode"
54
55     @classmethod
56     def _register_attributes(cls):
57         hostname = Attribute("hostname", "Hostname of the machine",
58                 flags = Flags.ExecReadOnly)
59
60         username = Attribute("username", "Local account username", 
61                 flags = Flags.Credential)
62
63         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
64         
65         home = Attribute("home",
66                 "Experiment home directory to store all experiment related files",
67                 flags = Flags.ExecReadOnly)
68         
69         identity = Attribute("identity", "SSH identity file",
70                 flags = Flags.Credential)
71         
72         server_key = Attribute("serverKey", "Server public key", 
73                 flags = Flags.ExecReadOnly)
74         
75         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
76                 " from home folder before starting experiment", 
77                 flags = Flags.ExecReadOnly)
78         
79         clean_processes = Attribute("cleanProcesses", 
80                 "Kill all running processes before starting experiment",
81                 flags = Flags.ExecReadOnly)
82         
83         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
84                 "releasing the resource",
85                 flags = Flags.ExecReadOnly)
86
87         cls._register_attribute(hostname)
88         cls._register_attribute(username)
89         cls._register_attribute(port)
90         cls._register_attribute(home)
91         cls._register_attribute(identity)
92         cls._register_attribute(server_key)
93         cls._register_attribute(clean_home)
94         cls._register_attribute(clean_processes)
95         cls._register_attribute(tear_down)
96
97     def __init__(self, ec, guid):
98         super(LinuxNode, self).__init__(ec, guid)
99         self._os = None
100         
101         # lock to avoid concurrency issues on methods used by applications 
102         self._lock = threading.Lock()
103     
104     def log_message(self, msg):
105         return " guid %d - host %s - %s " % (self.guid, 
106                 self.get("hostname"), msg)
107
108     @property
109     def home(self):
110         return self.get("home") or ""
111
112     @property
113     def exp_home(self):
114         return os.path.join(self.home, self.ec.exp_id)
115
116     @property
117     def node_home(self):
118         node_home = "node-%d" % self.guid
119         return os.path.join(self.exp_home, node_home)
120
121     @property
122     def os(self):
123         if self._os:
124             return self._os
125
126         if (not self.get("hostname") or not self.get("username")):
127             msg = "Can't resolve OS, insufficient data "
128             self.error(msg)
129             raise RuntimeError, msg
130
131         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
132
133         if err and proc.poll():
134             msg = "Error detecting OS "
135             self.error(msg, out, err)
136             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
137
138         if out.find("Fedora release 12") == 0:
139             self._os = "f12"
140         elif out.find("Fedora release 14") == 0:
141             self._os = "f14"
142         elif out.find("Debian") == 0: 
143             self._os = "debian"
144         elif out.find("Ubuntu") ==0:
145             self._os = "ubuntu"
146         else:
147             msg = "Unsupported OS"
148             self.error(msg, out)
149             raise RuntimeError, "%s - %s " %( msg, out )
150
151         return self._os
152
153     @property
154     def localhost(self):
155         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
156
157     def provision(self):
158         if not self.is_alive():
159             self._state = ResourceState.FAILED
160             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
161             self.error(msg)
162             raise RuntimeError, msg
163
164         if self.get("cleanProcesses"):
165             self.clean_processes()
166
167         if self.get("cleanHome"):
168             self.clean_home()
169        
170         self.mkdir(self.node_home)
171
172         super(LinuxNode, self).provision()
173
174     def deploy(self):
175         if self.state == ResourceState.NEW:
176             try:
177                self.discover()
178                self.provision()
179             except:
180                 self._state = ResourceState.FAILED
181                 raise
182
183         # Node needs to wait until all associated interfaces are 
184         # ready before it can finalize deployment
185         from nepi.resources.linux.interface import LinuxInterface
186         ifaces = self.get_connected(LinuxInterface.rtype())
187         for iface in ifaces:
188             if iface.state < ResourceState.READY:
189                 self.ec.schedule(reschedule_delay, self.deploy)
190                 return 
191
192         super(LinuxNode, self).deploy()
193
194     def release(self):
195         tear_down = self.get("tearDown")
196         if tear_down:
197             self.execute(tear_down)
198
199         super(LinuxNode, self).release()
200
201     def valid_connection(self, guid):
202         # TODO: Validate!
203         return True
204
205     def clean_processes(self, killer = False):
206         self.info("Cleaning up processes")
207         
208         if killer:
209             # Hardcore kill
210             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
211                 "sudo -S killall python tcpdump || /bin/true ; " +
212                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
213                 "sudo -S killall -u root || /bin/true ; " +
214                 "sudo -S killall -u root || /bin/true ; ")
215         else:
216             # Be gentler...
217             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
218                 "sudo -S killall tcpdump || /bin/true ; " +
219                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
220                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
221
222         out = err = ""
223         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
224             
225     def clean_home(self):
226         self.info("Cleaning up home")
227         
228         cmd = (
229             # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
230             "find . -maxdepth 1 -name 'nepi-*' " +
231             " -execdir rm -rf {} + "
232             )
233             
234         if self.home:
235             cmd = "cd %s ; " % self.home + cmd
236
237         out = err = ""
238         (out, err), proc = self.execute(cmd, with_lock = True)
239
240     def upload(self, src, dst, text = False):
241         """ Copy content to destination
242
243            src  content to copy. Can be a local file, directory or a list of files
244
245            dst  destination path on the remote host (remote is always self.host)
246
247            text src is text input, it must be stored into a temp file before uploading
248         """
249         # If source is a string input 
250         f = None
251         if text and not os.path.isfile(src):
252             # src is text input that should be uploaded as file
253             # create a temporal file with the content to upload
254             f = tempfile.NamedTemporaryFile(delete=False)
255             f.write(src)
256             f.close()
257             src = f.name
258
259         if not self.localhost:
260             # Build destination as <user>@<server>:<path>
261             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
262
263         result = self.copy(src, dst)
264
265         # clean up temp file
266         if f:
267             os.remove(f.name)
268
269         return result
270
271     def download(self, src, dst):
272         if not self.localhost:
273             # Build destination as <user>@<server>:<path>
274             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
275         return self.copy(src, dst)
276
277     def install_packages(self, packages, home):
278         command = ""
279         if self.os in ["f12", "f14"]:
280             command = rpmfuncs.install_packages_command(self.os, packages)
281         elif self.os in ["debian", "ubuntu"]:
282             command = debfuncs.install_packages_command(self.os, packages)
283         else:
284             msg = "Error installing packages ( OS not known ) "
285             self.error(msg, self.os)
286             raise RuntimeError, msg
287
288         out = err = ""
289         (out, err), proc = self.run_and_wait(command, home, 
290             shfile = "instpkg.sh",
291             pidfile = "instpkg_pidfile",
292             ecodefile = "instpkg_exitcode",
293             stdout = "instpkg_stdout", 
294             stderr = "instpkg_stderr",
295             raise_on_error = True)
296
297         return (out, err), proc 
298
299     def remove_packages(self, packages, home):
300         command = ""
301         if self.os in ["f12", "f14"]:
302             command = rpmfuncs.remove_packages_command(self.os, packages)
303         elif self.os in ["debian", "ubuntu"]:
304             command = debfuncs.remove_packages_command(self.os, packages)
305         else:
306             msg = "Error removing packages ( OS not known ) "
307             self.error(msg)
308             raise RuntimeError, msg
309
310         out = err = ""
311         (out, err), proc = self.run_and_wait(command, home, 
312             shfile = "rmpkg.sh",
313             pidfile = "rmpkg_pidfile",
314             ecodefile = "rmpkg_exitcode",
315             stdout = "rmpkg_stdout", 
316             stderr = "rmpkg_stderr",
317             raise_on_error = True)
318          
319         return (out, err), proc 
320
321     def mkdir(self, path, clean = False):
322         if clean:
323             self.rmdir(path)
324
325         return self.execute("mkdir -p %s" % path, with_lock = True)
326
327     def rmdir(self, path):
328         return self.execute("rm -rf %s" % path, with_lock = True)
329         
330     def run_and_wait(self, command, home, 
331             shfile = "cmd.sh",
332             pidfile = "pidfile", 
333             ecodefile = "exitcode", 
334             stdin = None, 
335             stdout = "stdout", 
336             stderr = "stderr", 
337             sudo = False,
338             tty = False,
339             raise_on_error = False):
340         """ 
341         runs a command in background on the remote host, busy-waiting
342         until the command finishes execution.
343         This is more robust than doing a simple synchronized 'execute',
344         since in the remote host the command can continue to run detached
345         even if network disconnections occur
346         """
347         self.upload_command(command, home, shfile, ecodefile)
348
349         command = "bash ./%s" % shfile
350         # run command in background in remote host
351         (out, err), proc = self.run(command, home, 
352                 pidfile = pidfile,
353                 stdin = stdin, 
354                 stdout = stdout, 
355                 stderr = stderr, 
356                 sudo = sudo,
357                 tty = tty)
358
359         # check no errors occurred
360         if proc.poll() and err:
361             msg = " Failed to run command '%s' " % command
362             self.error(msg, out, err)
363             if raise_on_error:
364                 raise RuntimeError, msg
365
366         # Wait for pid file to be generated
367         pid, ppid = self.wait_pid(
368                 home = home, 
369                 pidfile = pidfile, 
370                 raise_on_error = raise_on_error)
371
372         # wait until command finishes to execute
373         self.wait_run(pid, ppid)
374       
375         (out, err), proc = self.check_errors(home, ecodefile, stderr)
376
377         # Out is what was written in the stderr file
378         if out or err:
379             msg = " Failed to run command '%s' " % command
380             self.error(msg, out, err)
381
382             if raise_on_error:
383                 raise RuntimeError, msg
384         
385         return (out, err), proc
386
387     def exitcode(self, home, ecodefile = "exitcode"):
388         """
389         Get the exit code of an application.
390         Returns an integer value with the exit code 
391         """
392         (out, err), proc = self.check_output(home, ecodefile)
393
394         # Succeeded to open file, return exit code in the file
395         if proc.wait() == 0:
396             try:
397                 return int(out.strip())
398             except:
399                 # Error in the content of the file!
400                 return ExitCode.CORRUPTFILE
401
402         # No such file or directory
403         if proc.returncode == 1:
404             return ExitCode.FILENOTFOUND
405         
406         # Other error from 'cat'
407         return ExitCode.ERROR
408
409     def upload_command(self, command, home, 
410             shfile = "cmd.sh",
411             ecodefile = "exitcode",
412             env = None):
413
414         command = "{ ( %(command)s ) ; } ; echo $? > %(ecodefile)s " % {
415                 'command': command,
416                 'ecodefile': ecodefile,
417                 } 
418
419         # Export environment
420         environ = ""
421         if env:
422             for var in env.split(" "):
423                 environ += 'export %s\n' % var
424
425         command = environ + command
426
427         dst = os.path.join(home, shfile)
428         return self.upload(command, dst, text = True)
429
430     def check_errors(self, home, 
431             ecodefile = "exitcode", 
432             stderr = "stderr"):
433         """
434         Checks whether errors occurred while running a command.
435         It first checks the exit code for the command, and only if the
436         exit code is an error one it returns the error output.
437         """
438         out = err = ""
439         proc = None
440
441         # get Exit code
442         ecode = self.exitcode(home, ecodefile)
443
444         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
445             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
446         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
447             # The process returned an error code or didn't exist. 
448             # Check standard error.
449             (out, err), proc = self.check_output(home, stderr)
450             
451             # If the stderr file was not found, assume nothing happened.
452             # We just ignore the error.
453             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: # cat - No such file or directory
454                 err = ""
455        
456         return (out, err), proc
457  
458     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
459         """ Waits until the pid file for the command is generated, 
460             and returns the pid and ppid of the process """
461         pid = ppid = None
462         delay = 1.0
463
464         for i in xrange(4):
465             pidtuple = self.getpid(home = home, pidfile = pidfile)
466             
467             if pidtuple:
468                 pid, ppid = pidtuple
469                 break
470             else:
471                 time.sleep(delay)
472                 delay = delay * 1.5
473         else:
474             msg = " Failed to get pid for pidfile %s/%s " % (
475                     home, pidfile )
476             self.error(msg)
477             
478             if raise_on_error:
479                 raise RuntimeError, msg
480
481         return pid, ppid
482
483     def wait_run(self, pid, ppid, trial = 0):
484         """ wait for a remote process to finish execution """
485         start_delay = 1.0
486
487         while True:
488             status = self.status(pid, ppid)
489             
490             if status is ProcStatus.FINISHED:
491                 break
492             elif status is not ProcStatus.RUNNING:
493                 delay = delay * 1.5
494                 time.sleep(delay)
495                 # If it takes more than 20 seconds to start, then
496                 # asume something went wrong
497                 if delay > 20:
498                     break
499             else:
500                 # The app is running, just wait...
501                 time.sleep(0.5)
502
503     def check_output(self, home, filename):
504         """ Retrives content of file """
505         (out, err), proc = self.execute("cat %s" % 
506             os.path.join(home, filename), retry = 1, with_lock = True)
507         return (out, err), proc
508
509     def is_alive(self):
510         if self.localhost:
511             return True
512
513         out = err = ""
514         try:
515             # TODO: FIX NOT ALIVE!!!!
516             (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
517                     with_lock = True)
518         except:
519             import traceback
520             trace = traceback.format_exc()
521             msg = "Unresponsive host  %s " % err
522             self.error(msg, out, trace)
523             return False
524
525         if out.strip().startswith('ALIVE'):
526             return True
527         else:
528             msg = "Unresponsive host "
529             self.error(msg, out, err)
530             return False
531
532     def copy(self, src, dst):
533         if self.localhost:
534             (out, err), proc = execfuncs.lcopy(source, dest, 
535                     recursive = True,
536                     strict_host_checking = False)
537         else:
538             with self._lock:
539                 (out, err), proc = sshfuncs.rcopy(
540                     src, dst, 
541                     port = self.get("port"),
542                     identity = self.get("identity"),
543                     server_key = self.get("serverKey"),
544                     recursive = True,
545                     strict_host_checking = False)
546
547         return (out, err), proc
548
549     def execute(self, command,
550             sudo = False,
551             stdin = None, 
552             env = None,
553             tty = False,
554             forward_x11 = False,
555             timeout = None,
556             retry = 3,
557             err_on_timeout = True,
558             connect_timeout = 30,
559             strict_host_checking = False,
560             persistent = True,
561             with_lock = False
562             ):
563         """ Notice that this invocation will block until the
564         execution finishes. If this is not the desired behavior,
565         use 'run' instead."""
566
567         if self.localhost:
568             (out, err), proc = execfuncs.lexec(command, 
569                     user = user,
570                     sudo = sudo,
571                     stdin = stdin,
572                     env = env)
573         else:
574             if with_lock:
575                 with self._lock:
576                     (out, err), proc = sshfuncs.rexec(
577                         command, 
578                         host = self.get("hostname"),
579                         user = self.get("username"),
580                         port = self.get("port"),
581                         agent = True,
582                         sudo = sudo,
583                         stdin = stdin,
584                         identity = self.get("identity"),
585                         server_key = self.get("serverKey"),
586                         env = env,
587                         tty = tty,
588                         forward_x11 = forward_x11,
589                         timeout = timeout,
590                         retry = retry,
591                         err_on_timeout = err_on_timeout,
592                         connect_timeout = connect_timeout,
593                         persistent = persistent,
594                         strict_host_checking = strict_host_checking
595                         )
596             else:
597                 (out, err), proc = sshfuncs.rexec(
598                     command, 
599                     host = self.get("hostname"),
600                     user = self.get("username"),
601                     port = self.get("port"),
602                     agent = True,
603                     sudo = sudo,
604                     stdin = stdin,
605                     identity = self.get("identity"),
606                     server_key = self.get("serverKey"),
607                     env = env,
608                     tty = tty,
609                     forward_x11 = forward_x11,
610                     timeout = timeout,
611                     retry = retry,
612                     err_on_timeout = err_on_timeout,
613                     connect_timeout = connect_timeout,
614                     persistent = persistent
615                     )
616
617         return (out, err), proc
618
619     def run(self, command, home,
620             create_home = False,
621             pidfile = 'pidfile',
622             stdin = None, 
623             stdout = 'stdout', 
624             stderr = 'stderr', 
625             sudo = False,
626             tty = False):
627         
628         self.debug("Running command '%s'" % command)
629         
630         if self.localhost:
631             (out, err), proc = execfuncs.lspawn(command, pidfile, 
632                     stdout = stdout, 
633                     stderr = stderr, 
634                     stdin = stdin, 
635                     home = home, 
636                     create_home = create_home, 
637                     sudo = sudo,
638                     user = user) 
639         else:
640             with self._lock:
641                 (out, err), proc = sshfuncs.rspawn(
642                     command,
643                     pidfile = pidfile,
644                     home = home,
645                     create_home = create_home,
646                     stdin = stdin if stdin is not None else '/dev/null',
647                     stdout = stdout if stdout else '/dev/null',
648                     stderr = stderr if stderr else '/dev/null',
649                     sudo = sudo,
650                     host = self.get("hostname"),
651                     user = self.get("username"),
652                     port = self.get("port"),
653                     agent = True,
654                     identity = self.get("identity"),
655                     server_key = self.get("serverKey"),
656                     tty = tty
657                     )
658
659         return (out, err), proc
660
661     def getpid(self, home, pidfile = "pidfile"):
662         if self.localhost:
663             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
664         else:
665             with self._lock:
666                 pidtuple = sshfuncs.rgetpid(
667                     os.path.join(home, pidfile),
668                     host = self.get("hostname"),
669                     user = self.get("username"),
670                     port = self.get("port"),
671                     agent = True,
672                     identity = self.get("identity"),
673                     server_key = self.get("serverKey")
674                     )
675         
676         return pidtuple
677
678     def status(self, pid, ppid):
679         if self.localhost:
680             status = execfuncs.lstatus(pid, ppid)
681         else:
682             with self._lock:
683                 status = sshfuncs.rstatus(
684                         pid, ppid,
685                         host = self.get("hostname"),
686                         user = self.get("username"),
687                         port = self.get("port"),
688                         agent = True,
689                         identity = self.get("identity"),
690                         server_key = self.get("serverKey")
691                         )
692            
693         return status
694     
695     def kill(self, pid, ppid, sudo = False):
696         out = err = ""
697         proc = None
698         status = self.status(pid, ppid)
699
700         if status == sshfuncs.ProcStatus.RUNNING:
701             if self.localhost:
702                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
703             else:
704                 with self._lock:
705                     (out, err), proc = sshfuncs.rkill(
706                         pid, ppid,
707                         host = self.get("hostname"),
708                         user = self.get("username"),
709                         port = self.get("port"),
710                         agent = True,
711                         sudo = sudo,
712                         identity = self.get("identity"),
713                         server_key = self.get("serverKey")
714                         )
715
716         return (out, err), proc
717