Adding CCN RMs for Linux Backend
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33
34 # TODO: Verify files and dirs exists already
35 # TODO: Blacklist nodes!
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 reschedule_delay = "0.5s"
40
41 class ExitCode:
42     """
43     Error codes that the rexitcode function can return if unable to
44     check the exit code of a spawned process
45     """
46     FILENOTFOUND = -1
47     CORRUPTFILE = -2
48     ERROR = -3
49     OK = 0
50
51 class OSType:
52     """
53     Supported flavors of Linux OS
54     """
55     FEDORA_12 = "f12"
56     FEDORA_14 = "f14"
57     FEDORA = "fedora"
58     UBUNTU = "ubuntu"
59     DEBIAN = "debian"
60
61 @clsinit
62 class LinuxNode(ResourceManager):
63     _rtype = "LinuxNode"
64
65     @classmethod
66     def _register_attributes(cls):
67         hostname = Attribute("hostname", "Hostname of the machine",
68                 flags = Flags.ExecReadOnly)
69
70         username = Attribute("username", "Local account username", 
71                 flags = Flags.Credential)
72
73         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
74         
75         home = Attribute("home",
76                 "Experiment home directory to store all experiment related files",
77                 flags = Flags.ExecReadOnly)
78         
79         identity = Attribute("identity", "SSH identity file",
80                 flags = Flags.Credential)
81         
82         server_key = Attribute("serverKey", "Server public key", 
83                 flags = Flags.ExecReadOnly)
84         
85         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
86                 " from home folder before starting experiment", 
87                 flags = Flags.ExecReadOnly)
88         
89         clean_processes = Attribute("cleanProcesses", 
90                 "Kill all running processes before starting experiment",
91                 flags = Flags.ExecReadOnly)
92         
93         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
94                 "releasing the resource",
95                 flags = Flags.ExecReadOnly)
96
97         cls._register_attribute(hostname)
98         cls._register_attribute(username)
99         cls._register_attribute(port)
100         cls._register_attribute(home)
101         cls._register_attribute(identity)
102         cls._register_attribute(server_key)
103         cls._register_attribute(clean_home)
104         cls._register_attribute(clean_processes)
105         cls._register_attribute(tear_down)
106
107     def __init__(self, ec, guid):
108         super(LinuxNode, self).__init__(ec, guid)
109         self._os = None
110         
111         # lock to avoid concurrency issues on methods used by applications 
112         self._lock = threading.Lock()
113     
114     def log_message(self, msg):
115         return " guid %d - host %s - %s " % (self.guid, 
116                 self.get("hostname"), msg)
117
118     @property
119     def home(self):
120         return self.get("home") or ""
121
122     @property
123     def exp_home(self):
124         return os.path.join(self.home, self.ec.exp_id)
125
126     @property
127     def node_home(self):
128         node_home = "node-%d" % self.guid
129         return os.path.join(self.exp_home, node_home)
130
131     @property
132     def os(self):
133         if self._os:
134             return self._os
135
136         if (not self.get("hostname") or not self.get("username")):
137             msg = "Can't resolve OS, insufficient data "
138             self.error(msg)
139             raise RuntimeError, msg
140
141         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
142
143         if err and proc.poll():
144             msg = "Error detecting OS "
145             self.error(msg, out, err)
146             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
147
148         if out.find("Fedora release 12") == 0:
149             self._os = OSType.FEDORA_12
150         elif out.find("Fedora release 14") == 0:
151             self._os = OSType.FEDORA_14
152         elif out.find("Debian") == 0: 
153             self._os = OSType.DEBIAN
154         elif out.find("Ubuntu") ==0:
155             self._os = OSType.UBUNTU
156         else:
157             msg = "Unsupported OS"
158             self.error(msg, out)
159             raise RuntimeError, "%s - %s " %( msg, out )
160
161         return self._os
162
163     @property
164     def localhost(self):
165         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
166
167     def provision(self):
168         if not self.is_alive():
169             self._state = ResourceState.FAILED
170             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
171             self.error(msg)
172             raise RuntimeError, msg
173
174         if self.get("cleanProcesses"):
175             self.clean_processes()
176
177         if self.get("cleanHome"):
178             self.clean_home()
179        
180         self.mkdir(self.node_home)
181
182         super(LinuxNode, self).provision()
183
184     def deploy(self):
185         if self.state == ResourceState.NEW:
186             try:
187                 self.discover()
188                 self.provision()
189             except:
190                 self._state = ResourceState.FAILED
191                 raise
192
193         # Node needs to wait until all associated interfaces are 
194         # ready before it can finalize deployment
195         from nepi.resources.linux.interface import LinuxInterface
196         ifaces = self.get_connected(LinuxInterface.rtype())
197         for iface in ifaces:
198             if iface.state < ResourceState.READY:
199                 self.ec.schedule(reschedule_delay, self.deploy)
200                 return 
201
202         super(LinuxNode, self).deploy()
203
204     def release(self):
205         tear_down = self.get("tearDown")
206         if tear_down:
207             self.execute(tear_down)
208
209         super(LinuxNode, self).release()
210
211     def valid_connection(self, guid):
212         # TODO: Validate!
213         return True
214
215     def clean_processes(self, killer = False):
216         self.info("Cleaning up processes")
217         
218         if killer:
219             # Hardcore kill
220             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
221                 "sudo -S killall python tcpdump || /bin/true ; " +
222                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
223                 "sudo -S killall -u root || /bin/true ; " +
224                 "sudo -S killall -u root || /bin/true ; ")
225         else:
226             # Be gentler...
227             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
228                 "sudo -S killall tcpdump || /bin/true ; " +
229                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
230                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
231
232         out = err = ""
233         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
234             
235     def clean_home(self):
236         self.info("Cleaning up home")
237         
238         cmd = (
239             # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
240             "find . -maxdepth 1 -name 'nepi-*' " +
241             " -execdir rm -rf {} + "
242             )
243             
244         if self.home:
245             cmd = "cd %s ; " % self.home + cmd
246
247         out = err = ""
248         (out, err), proc = self.execute(cmd, with_lock = True)
249
250     def upload(self, src, dst, text = False):
251         """ Copy content to destination
252
253            src  content to copy. Can be a local file, directory or a list of files
254
255            dst  destination path on the remote host (remote is always self.host)
256
257            text src is text input, it must be stored into a temp file before uploading
258         """
259         # If source is a string input 
260         f = None
261         if text and not os.path.isfile(src):
262             # src is text input that should be uploaded as file
263             # create a temporal file with the content to upload
264             f = tempfile.NamedTemporaryFile(delete=False)
265             f.write(src)
266             f.close()
267             src = f.name
268
269         if not self.localhost:
270             # Build destination as <user>@<server>:<path>
271             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
272         result = self.copy(src, dst)
273
274         # clean up temp file
275         if f:
276             os.remove(f.name)
277
278         return result
279
280     def download(self, src, dst):
281         if not self.localhost:
282             # Build destination as <user>@<server>:<path>
283             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
284         return self.copy(src, dst)
285
286     def install_packages(self, packages, home):
287         command = ""
288         if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
289             command = rpmfuncs.install_packages_command(self.os, packages)
290         elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
291             command = debfuncs.install_packages_command(self.os, packages)
292         else:
293             msg = "Error installing packages ( OS not known ) "
294             self.error(msg, self.os)
295             raise RuntimeError, msg
296
297         out = err = ""
298         (out, err), proc = self.run_and_wait(command, home, 
299             shfile = "instpkg.sh",
300             pidfile = "instpkg_pidfile",
301             ecodefile = "instpkg_exitcode",
302             stdout = "instpkg_stdout", 
303             stderr = "instpkg_stderr",
304             raise_on_error = True)
305
306         return (out, err), proc 
307
308     def remove_packages(self, packages, home):
309         command = ""
310         if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
311             command = rpmfuncs.remove_packages_command(self.os, packages)
312         elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
313             command = debfuncs.remove_packages_command(self.os, packages)
314         else:
315             msg = "Error removing packages ( OS not known ) "
316             self.error(msg)
317             raise RuntimeError, msg
318
319         out = err = ""
320         (out, err), proc = self.run_and_wait(command, home, 
321             shfile = "rmpkg.sh",
322             pidfile = "rmpkg_pidfile",
323             ecodefile = "rmpkg_exitcode",
324             stdout = "rmpkg_stdout", 
325             stderr = "rmpkg_stderr",
326             raise_on_error = True)
327          
328         return (out, err), proc 
329
330     def mkdir(self, path, clean = False):
331         if clean:
332             self.rmdir(path)
333
334         return self.execute("mkdir -p %s" % path, with_lock = True)
335
336     def rmdir(self, path):
337         return self.execute("rm -rf %s" % path, with_lock = True)
338         
339     def run_and_wait(self, command, home, 
340             shfile = "cmd.sh",
341             env = None,
342             pidfile = "pidfile", 
343             ecodefile = "exitcode", 
344             stdin = None, 
345             stdout = "stdout", 
346             stderr = "stderr", 
347             sudo = False,
348             tty = False,
349             raise_on_error = False):
350         """ 
351         runs a command in background on the remote host, busy-waiting
352         until the command finishes execution.
353         This is more robust than doing a simple synchronized 'execute',
354         since in the remote host the command can continue to run detached
355         even if network disconnections occur
356         """
357         self.upload_command(command, home, 
358             shfile = shfile, 
359             ecodefile = ecodefile, 
360             env = env)
361
362         command = "bash ./%s" % shfile
363         # run command in background in remote host
364         (out, err), proc = self.run(command, home, 
365                 pidfile = pidfile,
366                 stdin = stdin, 
367                 stdout = stdout, 
368                 stderr = stderr, 
369                 sudo = sudo,
370                 tty = tty)
371
372         # check no errors occurred
373         if proc.poll():
374             msg = " Failed to run command '%s' " % command
375             self.error(msg, out, err)
376             if raise_on_error:
377                 raise RuntimeError, msg
378
379         # Wait for pid file to be generated
380         pid, ppid = self.wait_pid(
381                 home = home, 
382                 pidfile = pidfile, 
383                 raise_on_error = raise_on_error)
384
385         # wait until command finishes to execute
386         self.wait_run(pid, ppid)
387       
388         (out, err), proc = self.check_errors(home, ecodefile, stderr)
389
390         # Out is what was written in the stderr file
391         if err:
392             msg = " Failed to run command '%s' " % command
393             self.error(msg, out, err)
394
395             if raise_on_error:
396                 raise RuntimeError, msg
397         
398         return (out, err), proc
399
400     def exitcode(self, home, ecodefile = "exitcode"):
401         """
402         Get the exit code of an application.
403         Returns an integer value with the exit code 
404         """
405         (out, err), proc = self.check_output(home, ecodefile)
406
407         # Succeeded to open file, return exit code in the file
408         if proc.wait() == 0:
409             try:
410                 return int(out.strip())
411             except:
412                 # Error in the content of the file!
413                 return ExitCode.CORRUPTFILE
414
415         # No such file or directory
416         if proc.returncode == 1:
417             return ExitCode.FILENOTFOUND
418         
419         # Other error from 'cat'
420         return ExitCode.ERROR
421
422     def upload_command(self, command, home, 
423             shfile = "cmd.sh",
424             ecodefile = "exitcode",
425             env = None):
426         """ Saves the command as a bash script file in the remote host, and
427         forces to save the exit code of the command execution to the ecodefile
428         """
429       
430         # The exit code of the command will be stored in ecodefile
431         command = " %(command)s ; echo $? > %(ecodefile)s ;" % {
432                 'command': command,
433                 'ecodefile': ecodefile,
434                 } 
435
436         # Export environment
437         environ = self.format_environment(env)
438
439         # Add environ to command
440         command = environ + command
441
442         dst = os.path.join(home, shfile)
443         return self.upload(command, dst, text = True)
444
445     def format_environment(self, env, inline = False):
446         """Format environmental variables for command to be executed either
447         as an inline command (i.e. PYTHONPATH=src/.. python script.py) or
448         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
449         """
450         sep = " " if inline else "\n"
451         export = " " if inline else "export"
452         return sep.join(map(lambda e: "%s %s" % (export, e),
453             env.strip().split(" "))) + sep if env else ""
454
455     def check_errors(self, home, 
456             ecodefile = "exitcode", 
457             stderr = "stderr",
458             stdout = "stdout"):
459         """
460         Checks whether errors occurred while running a command.
461         It first checks the exit code for the command, and only if the
462         exit code is an error one it returns the error output.
463
464         """
465         proc = None
466         err = ""
467         # retrive standard output from the file
468         (out, oerr), oproc = self.check_output(home, stdout)
469
470         # get exit code saved in the 'exitcode' file
471         ecode = self.exitcode(home, ecodefile)
472
473         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
474             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
475         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
476             # The process returned an error code or didn't exist. 
477             # Check standard error.
478             (err, eerr), proc = self.check_output(home, stderr)
479
480             # If the stderr file was not found, assume nothing bad happened,
481             # and just ignore the error.
482             # (cat returns 1 for error "No such file or directory")
483             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
484                 err = "" 
485             
486         return (out, err), proc
487  
488     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
489         """ Waits until the pid file for the command is generated, 
490             and returns the pid and ppid of the process """
491         pid = ppid = None
492         delay = 1.0
493
494         for i in xrange(4):
495             pidtuple = self.getpid(home = home, pidfile = pidfile)
496             
497             if pidtuple:
498                 pid, ppid = pidtuple
499                 break
500             else:
501                 time.sleep(delay)
502                 delay = delay * 1.5
503         else:
504             msg = " Failed to get pid for pidfile %s/%s " % (
505                     home, pidfile )
506             self.error(msg)
507             
508             if raise_on_error:
509                 raise RuntimeError, msg
510
511         return pid, ppid
512
513     def wait_run(self, pid, ppid, trial = 0):
514         """ wait for a remote process to finish execution """
515         start_delay = 1.0
516
517         while True:
518             status = self.status(pid, ppid)
519             
520             if status is ProcStatus.FINISHED:
521                 break
522             elif status is not ProcStatus.RUNNING:
523                 delay = delay * 1.5
524                 time.sleep(delay)
525                 # If it takes more than 20 seconds to start, then
526                 # asume something went wrong
527                 if delay > 20:
528                     break
529             else:
530                 # The app is running, just wait...
531                 time.sleep(0.5)
532
533     def check_output(self, home, filename):
534         """ Retrives content of file """
535         (out, err), proc = self.execute("cat %s" % 
536             os.path.join(home, filename), retry = 1, with_lock = True)
537         return (out, err), proc
538
539     def is_alive(self):
540         if self.localhost:
541             return True
542
543         out = err = ""
544         try:
545             # TODO: FIX NOT ALIVE!!!!
546             (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
547                     with_lock = True)
548         except:
549             import traceback
550             trace = traceback.format_exc()
551             msg = "Unresponsive host  %s " % err
552             self.error(msg, out, trace)
553             return False
554
555         if out.strip().startswith('ALIVE'):
556             return True
557         else:
558             msg = "Unresponsive host "
559             self.error(msg, out, err)
560             return False
561
562     def copy(self, src, dst):
563         if self.localhost:
564             (out, err), proc = execfuncs.lcopy(source, dest, 
565                     recursive = True,
566                     strict_host_checking = False)
567         else:
568             with self._lock:
569                 (out, err), proc = sshfuncs.rcopy(
570                     src, dst, 
571                     port = self.get("port"),
572                     identity = self.get("identity"),
573                     server_key = self.get("serverKey"),
574                     recursive = True,
575                     strict_host_checking = False)
576
577         return (out, err), proc
578
579     def execute(self, command,
580             sudo = False,
581             stdin = None, 
582             env = None,
583             tty = False,
584             forward_x11 = False,
585             timeout = None,
586             retry = 3,
587             err_on_timeout = True,
588             connect_timeout = 30,
589             strict_host_checking = False,
590             persistent = True,
591             blocking = True,
592             with_lock = False
593             ):
594         """ Notice that this invocation will block until the
595         execution finishes. If this is not the desired behavior,
596         use 'run' instead."""
597
598         if self.localhost:
599             (out, err), proc = execfuncs.lexec(command, 
600                     user = user,
601                     sudo = sudo,
602                     stdin = stdin,
603                     env = env)
604         else:
605             if with_lock:
606                 with self._lock:
607                     (out, err), proc = sshfuncs.rexec(
608                         command, 
609                         host = self.get("hostname"),
610                         user = self.get("username"),
611                         port = self.get("port"),
612                         agent = True,
613                         sudo = sudo,
614                         stdin = stdin,
615                         identity = self.get("identity"),
616                         server_key = self.get("serverKey"),
617                         env = env,
618                         tty = tty,
619                         forward_x11 = forward_x11,
620                         timeout = timeout,
621                         retry = retry,
622                         err_on_timeout = err_on_timeout,
623                         connect_timeout = connect_timeout,
624                         persistent = persistent,
625                         blocking = blocking, 
626                         strict_host_checking = strict_host_checking
627                         )
628             else:
629                 (out, err), proc = sshfuncs.rexec(
630                     command, 
631                     host = self.get("hostname"),
632                     user = self.get("username"),
633                     port = self.get("port"),
634                     agent = True,
635                     sudo = sudo,
636                     stdin = stdin,
637                     identity = self.get("identity"),
638                     server_key = self.get("serverKey"),
639                     env = env,
640                     tty = tty,
641                     forward_x11 = forward_x11,
642                     timeout = timeout,
643                     retry = retry,
644                     err_on_timeout = err_on_timeout,
645                     connect_timeout = connect_timeout,
646                     persistent = persistent,
647                     blocking = blocking, 
648                     strict_host_checking = strict_host_checking
649                     )
650
651         return (out, err), proc
652
653     def run(self, command, home,
654             create_home = False,
655             pidfile = 'pidfile',
656             stdin = None, 
657             stdout = 'stdout', 
658             stderr = 'stderr', 
659             sudo = False,
660             tty = False):
661         
662         self.debug("Running command '%s'" % command)
663         
664         if self.localhost:
665             (out, err), proc = execfuncs.lspawn(command, pidfile, 
666                     stdout = stdout, 
667                     stderr = stderr, 
668                     stdin = stdin, 
669                     home = home, 
670                     create_home = create_home, 
671                     sudo = sudo,
672                     user = user) 
673         else:
674             with self._lock:
675                 (out, err), proc = sshfuncs.rspawn(
676                     command,
677                     pidfile = pidfile,
678                     home = home,
679                     create_home = create_home,
680                     stdin = stdin if stdin is not None else '/dev/null',
681                     stdout = stdout if stdout else '/dev/null',
682                     stderr = stderr if stderr else '/dev/null',
683                     sudo = sudo,
684                     host = self.get("hostname"),
685                     user = self.get("username"),
686                     port = self.get("port"),
687                     agent = True,
688                     identity = self.get("identity"),
689                     server_key = self.get("serverKey"),
690                     tty = tty
691                     )
692
693         return (out, err), proc
694
695     def getpid(self, home, pidfile = "pidfile"):
696         if self.localhost:
697             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
698         else:
699             with self._lock:
700                 pidtuple = sshfuncs.rgetpid(
701                     os.path.join(home, pidfile),
702                     host = self.get("hostname"),
703                     user = self.get("username"),
704                     port = self.get("port"),
705                     agent = True,
706                     identity = self.get("identity"),
707                     server_key = self.get("serverKey")
708                     )
709         
710         return pidtuple
711
712     def status(self, pid, ppid):
713         if self.localhost:
714             status = execfuncs.lstatus(pid, ppid)
715         else:
716             with self._lock:
717                 status = sshfuncs.rstatus(
718                         pid, ppid,
719                         host = self.get("hostname"),
720                         user = self.get("username"),
721                         port = self.get("port"),
722                         agent = True,
723                         identity = self.get("identity"),
724                         server_key = self.get("serverKey")
725                         )
726            
727         return status
728     
729     def kill(self, pid, ppid, sudo = False):
730         out = err = ""
731         proc = None
732         status = self.status(pid, ppid)
733
734         if status == sshfuncs.ProcStatus.RUNNING:
735             if self.localhost:
736                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
737             else:
738                 with self._lock:
739                     (out, err), proc = sshfuncs.rkill(
740                         pid, ppid,
741                         host = self.get("hostname"),
742                         user = self.get("username"),
743                         port = self.get("port"),
744                         agent = True,
745                         sudo = sudo,
746                         identity = self.get("identity"),
747                         server_key = self.get("serverKey")
748                         )
749
750         return (out, err), proc
751