Added file transfer example
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33
34 # TODO: Verify files and dirs exists already
35 # TODO: Blacklist nodes!
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 reschedule_delay = "0.5s"
40
41 class ExitCode:
42     """
43     Error codes that the rexitcode function can return if unable to
44     check the exit code of a spawned process
45     """
46     FILENOTFOUND = -1
47     CORRUPTFILE = -2
48     ERROR = -3
49     OK = 0
50
51 class OSType:
52     """
53     Supported flavors of Linux OS
54     """
55     FEDORA_12 = "f12"
56     FEDORA_14 = "f14"
57     FEDORA = "fedora"
58     UBUNTU = "ubuntu"
59     DEBIAN = "debian"
60
61 @clsinit
62 class LinuxNode(ResourceManager):
63     _rtype = "LinuxNode"
64
65     @classmethod
66     def _register_attributes(cls):
67         hostname = Attribute("hostname", "Hostname of the machine",
68                 flags = Flags.ExecReadOnly)
69
70         username = Attribute("username", "Local account username", 
71                 flags = Flags.Credential)
72
73         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
74         
75         home = Attribute("home",
76                 "Experiment home directory to store all experiment related files",
77                 flags = Flags.ExecReadOnly)
78         
79         identity = Attribute("identity", "SSH identity file",
80                 flags = Flags.Credential)
81         
82         server_key = Attribute("serverKey", "Server public key", 
83                 flags = Flags.ExecReadOnly)
84         
85         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
86                 " from home folder before starting experiment", 
87                 flags = Flags.ExecReadOnly)
88         
89         clean_processes = Attribute("cleanProcesses", 
90                 "Kill all running processes before starting experiment",
91                 flags = Flags.ExecReadOnly)
92         
93         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
94                 "releasing the resource",
95                 flags = Flags.ExecReadOnly)
96
97         cls._register_attribute(hostname)
98         cls._register_attribute(username)
99         cls._register_attribute(port)
100         cls._register_attribute(home)
101         cls._register_attribute(identity)
102         cls._register_attribute(server_key)
103         cls._register_attribute(clean_home)
104         cls._register_attribute(clean_processes)
105         cls._register_attribute(tear_down)
106
107     def __init__(self, ec, guid):
108         super(LinuxNode, self).__init__(ec, guid)
109         self._os = None
110         
111         # lock to avoid concurrency issues on methods used by applications 
112         self._lock = threading.Lock()
113     
114     def log_message(self, msg):
115         return " guid %d - host %s - %s " % (self.guid, 
116                 self.get("hostname"), msg)
117
118     @property
119     def home(self):
120         return self.get("home") or ""
121
122     @property
123     def exp_home(self):
124         return os.path.join(self.home, self.ec.exp_id)
125
126     @property
127     def node_home(self):
128         node_home = "node-%d" % self.guid
129         return os.path.join(self.exp_home, node_home)
130
131     @property
132     def os(self):
133         if self._os:
134             return self._os
135
136         if (not self.get("hostname") or not self.get("username")):
137             msg = "Can't resolve OS, insufficient data "
138             self.error(msg)
139             raise RuntimeError, msg
140
141         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
142
143         if err and proc.poll():
144             msg = "Error detecting OS "
145             self.error(msg, out, err)
146             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
147
148         if out.find("Fedora release 12") == 0:
149             self._os = OSType.FEDORA_12
150         elif out.find("Fedora release 14") == 0:
151             self._os = OSType.FEDORA_14
152         elif out.find("Debian") == 0: 
153             self._os = OSType.DEBIAN
154         elif out.find("Ubuntu") ==0:
155             self._os = OSType.UBUNTU
156         else:
157             msg = "Unsupported OS"
158             self.error(msg, out)
159             raise RuntimeError, "%s - %s " %( msg, out )
160
161         return self._os
162
163     @property
164     def localhost(self):
165         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
166
167     def provision(self):
168         if not self.is_alive():
169             self._state = ResourceState.FAILED
170             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
171             self.error(msg)
172             raise RuntimeError, msg
173
174         if self.get("cleanProcesses"):
175             self.clean_processes()
176
177         if self.get("cleanHome"):
178             self.clean_home()
179        
180         self.mkdir(self.node_home)
181
182         super(LinuxNode, self).provision()
183
184     def deploy(self):
185         if self.state == ResourceState.NEW:
186             try:
187                 self.discover()
188                 self.provision()
189             except:
190                 self._state = ResourceState.FAILED
191                 raise
192
193         # Node needs to wait until all associated interfaces are 
194         # ready before it can finalize deployment
195         from nepi.resources.linux.interface import LinuxInterface
196         ifaces = self.get_connected(LinuxInterface.rtype())
197         for iface in ifaces:
198             if iface.state < ResourceState.READY:
199                 self.ec.schedule(reschedule_delay, self.deploy)
200                 return 
201
202         super(LinuxNode, self).deploy()
203
204     def release(self):
205         tear_down = self.get("tearDown")
206         if tear_down:
207             self.execute(tear_down)
208
209         super(LinuxNode, self).release()
210
211     def valid_connection(self, guid):
212         # TODO: Validate!
213         return True
214
215     def clean_processes(self, killer = False):
216         self.info("Cleaning up processes")
217         
218         if killer:
219             # Hardcore kill
220             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
221                 "sudo -S killall python tcpdump || /bin/true ; " +
222                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
223                 "sudo -S killall -u root || /bin/true ; " +
224                 "sudo -S killall -u root || /bin/true ; ")
225         else:
226             # Be gentler...
227             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
228                 "sudo -S killall tcpdump || /bin/true ; " +
229                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
230                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
231
232         out = err = ""
233         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
234             
235     def clean_home(self):
236         self.info("Cleaning up home")
237         
238         cmd = (
239             # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
240             "find . -maxdepth 1 -name 'nepi-*' " +
241             " -execdir rm -rf {} + "
242             )
243             
244         if self.home:
245             cmd = "cd %s ; " % self.home + cmd
246
247         out = err = ""
248         (out, err), proc = self.execute(cmd, with_lock = True)
249
250     def upload(self, src, dst, text = False):
251         """ Copy content to destination
252
253            src  content to copy. Can be a local file, directory or a list of files
254
255            dst  destination path on the remote host (remote is always self.host)
256
257            text src is text input, it must be stored into a temp file before uploading
258         """
259         # If source is a string input 
260         f = None
261         if text and not os.path.isfile(src):
262             # src is text input that should be uploaded as file
263             # create a temporal file with the content to upload
264             f = tempfile.NamedTemporaryFile(delete=False)
265             f.write(src)
266             f.close()
267             src = f.name
268
269         if not self.localhost:
270             # Build destination as <user>@<server>:<path>
271             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
272         result = self.copy(src, dst)
273
274         # clean up temp file
275         if f:
276             os.remove(f.name)
277
278         return result
279
280     def download(self, src, dst):
281         if not self.localhost:
282             # Build destination as <user>@<server>:<path>
283             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
284         return self.copy(src, dst)
285
286     def install_packages(self, packages, home):
287         command = ""
288         if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
289             command = rpmfuncs.install_packages_command(self.os, packages)
290         elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
291             command = debfuncs.install_packages_command(self.os, packages)
292         else:
293             msg = "Error installing packages ( OS not known ) "
294             self.error(msg, self.os)
295             raise RuntimeError, msg
296
297         out = err = ""
298         (out, err), proc = self.run_and_wait(command, home, 
299             shfile = "instpkg.sh",
300             pidfile = "instpkg_pidfile",
301             ecodefile = "instpkg_exitcode",
302             stdout = "instpkg_stdout", 
303             stderr = "instpkg_stderr",
304             raise_on_error = True)
305
306         return (out, err), proc 
307
308     def remove_packages(self, packages, home):
309         command = ""
310         if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
311             command = rpmfuncs.remove_packages_command(self.os, packages)
312         elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
313             command = debfuncs.remove_packages_command(self.os, packages)
314         else:
315             msg = "Error removing packages ( OS not known ) "
316             self.error(msg)
317             raise RuntimeError, msg
318
319         out = err = ""
320         (out, err), proc = self.run_and_wait(command, home, 
321             shfile = "rmpkg.sh",
322             pidfile = "rmpkg_pidfile",
323             ecodefile = "rmpkg_exitcode",
324             stdout = "rmpkg_stdout", 
325             stderr = "rmpkg_stderr",
326             raise_on_error = True)
327          
328         return (out, err), proc 
329
330     def mkdir(self, path, clean = False):
331         if clean:
332             self.rmdir(path)
333
334         return self.execute("mkdir -p %s" % path, with_lock = True)
335
336     def rmdir(self, path):
337         return self.execute("rm -rf %s" % path, with_lock = True)
338         
339     def run_and_wait(self, command, home, 
340             shfile = "cmd.sh",
341             env = None,
342             pidfile = "pidfile", 
343             ecodefile = "exitcode", 
344             stdin = None, 
345             stdout = "stdout", 
346             stderr = "stderr", 
347             sudo = False,
348             tty = False,
349             raise_on_error = False):
350         """ 
351         runs a command in background on the remote host, busy-waiting
352         until the command finishes execution.
353         This is more robust than doing a simple synchronized 'execute',
354         since in the remote host the command can continue to run detached
355         even if network disconnections occur
356         """
357         self.upload_command(command, home, 
358             shfile = shfile, 
359             ecodefile = ecodefile, 
360             env = env)
361
362         command = "bash ./%s" % shfile
363         # run command in background in remote host
364         (out, err), proc = self.run(command, home, 
365                 pidfile = pidfile,
366                 stdin = stdin, 
367                 stdout = stdout, 
368                 stderr = stderr, 
369                 sudo = sudo,
370                 tty = tty)
371
372         # check no errors occurred
373         if proc.poll() and err:
374             msg = " Failed to run command '%s' " % command
375             self.error(msg, out, err)
376             if raise_on_error:
377                 raise RuntimeError, msg
378
379         # Wait for pid file to be generated
380         pid, ppid = self.wait_pid(
381                 home = home, 
382                 pidfile = pidfile, 
383                 raise_on_error = raise_on_error)
384
385         # wait until command finishes to execute
386         self.wait_run(pid, ppid)
387       
388         (out, err), proc = self.check_errors(home, ecodefile, stderr)
389
390         # Out is what was written in the stderr file
391         if out or err:
392             msg = " Failed to run command '%s' " % command
393             self.error(msg, out, err)
394
395             if raise_on_error:
396                 raise RuntimeError, msg
397         
398         return (out, err), proc
399
400     def exitcode(self, home, ecodefile = "exitcode"):
401         """
402         Get the exit code of an application.
403         Returns an integer value with the exit code 
404         """
405         (out, err), proc = self.check_output(home, ecodefile)
406
407         # Succeeded to open file, return exit code in the file
408         if proc.wait() == 0:
409             try:
410                 return int(out.strip())
411             except:
412                 # Error in the content of the file!
413                 return ExitCode.CORRUPTFILE
414
415         # No such file or directory
416         if proc.returncode == 1:
417             return ExitCode.FILENOTFOUND
418         
419         # Other error from 'cat'
420         return ExitCode.ERROR
421
422     def upload_command(self, command, home, 
423             shfile = "cmd.sh",
424             ecodefile = "exitcode",
425             env = None):
426         """ Saves the command as a bash script file in the remote host, and
427         forces to save the exit code of the command execution to the ecodefile
428         """
429       
430         # The exit code of the command will be stored in ecodefile
431         command = " %(command)s ; echo $? > %(ecodefile)s ;" % {
432                 'command': command,
433                 'ecodefile': ecodefile,
434                 } 
435
436         # Export environment
437         environ = "\n".join(map(lambda e: "export %s" % e, env.split(" "))) + "\n" \
438             if env else ""
439
440         # Add environ to command
441         command = environ + command
442
443         dst = os.path.join(home, shfile)
444         return self.upload(command, dst, text = True)
445
446     def check_errors(self, home, 
447             ecodefile = "exitcode", 
448             stderr = "stderr"):
449         """
450         Checks whether errors occurred while running a command.
451         It first checks the exit code for the command, and only if the
452         exit code is an error one it returns the error output.
453         """
454         out = err = ""
455         proc = None
456
457         # get Exit code
458         ecode = self.exitcode(home, ecodefile)
459
460         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
461             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
462         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
463             # The process returned an error code or didn't exist. 
464             # Check standard error.
465             (out, err), proc = self.check_output(home, stderr)
466             
467             # If the stderr file was not found, assume nothing happened.
468             # We just ignore the error.
469             # (cat returns 1 for error "No such file or directory")
470             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
471                 out = err = ""
472        
473         return (out, err), proc
474  
475     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
476         """ Waits until the pid file for the command is generated, 
477             and returns the pid and ppid of the process """
478         pid = ppid = None
479         delay = 1.0
480
481         for i in xrange(4):
482             pidtuple = self.getpid(home = home, pidfile = pidfile)
483             
484             if pidtuple:
485                 pid, ppid = pidtuple
486                 break
487             else:
488                 time.sleep(delay)
489                 delay = delay * 1.5
490         else:
491             msg = " Failed to get pid for pidfile %s/%s " % (
492                     home, pidfile )
493             self.error(msg)
494             
495             if raise_on_error:
496                 raise RuntimeError, msg
497
498         return pid, ppid
499
500     def wait_run(self, pid, ppid, trial = 0):
501         """ wait for a remote process to finish execution """
502         start_delay = 1.0
503
504         while True:
505             status = self.status(pid, ppid)
506             
507             if status is ProcStatus.FINISHED:
508                 break
509             elif status is not ProcStatus.RUNNING:
510                 delay = delay * 1.5
511                 time.sleep(delay)
512                 # If it takes more than 20 seconds to start, then
513                 # asume something went wrong
514                 if delay > 20:
515                     break
516             else:
517                 # The app is running, just wait...
518                 time.sleep(0.5)
519
520     def check_output(self, home, filename):
521         """ Retrives content of file """
522         (out, err), proc = self.execute("cat %s" % 
523             os.path.join(home, filename), retry = 1, with_lock = True)
524         return (out, err), proc
525
526     def is_alive(self):
527         if self.localhost:
528             return True
529
530         out = err = ""
531         try:
532             # TODO: FIX NOT ALIVE!!!!
533             (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
534                     with_lock = True)
535         except:
536             import traceback
537             trace = traceback.format_exc()
538             msg = "Unresponsive host  %s " % err
539             self.error(msg, out, trace)
540             return False
541
542         if out.strip().startswith('ALIVE'):
543             return True
544         else:
545             msg = "Unresponsive host "
546             self.error(msg, out, err)
547             return False
548
549     def copy(self, src, dst):
550         if self.localhost:
551             (out, err), proc = execfuncs.lcopy(source, dest, 
552                     recursive = True,
553                     strict_host_checking = False)
554         else:
555             with self._lock:
556                 (out, err), proc = sshfuncs.rcopy(
557                     src, dst, 
558                     port = self.get("port"),
559                     identity = self.get("identity"),
560                     server_key = self.get("serverKey"),
561                     recursive = True,
562                     strict_host_checking = False)
563
564         return (out, err), proc
565
566     def execute(self, command,
567             sudo = False,
568             stdin = None, 
569             env = None,
570             tty = False,
571             forward_x11 = False,
572             timeout = None,
573             retry = 3,
574             err_on_timeout = True,
575             connect_timeout = 30,
576             strict_host_checking = False,
577             persistent = True,
578             blocking = True,
579             with_lock = False
580             ):
581         """ Notice that this invocation will block until the
582         execution finishes. If this is not the desired behavior,
583         use 'run' instead."""
584
585         if self.localhost:
586             (out, err), proc = execfuncs.lexec(command, 
587                     user = user,
588                     sudo = sudo,
589                     stdin = stdin,
590                     env = env)
591         else:
592             if with_lock:
593                 with self._lock:
594                     (out, err), proc = sshfuncs.rexec(
595                         command, 
596                         host = self.get("hostname"),
597                         user = self.get("username"),
598                         port = self.get("port"),
599                         agent = True,
600                         sudo = sudo,
601                         stdin = stdin,
602                         identity = self.get("identity"),
603                         server_key = self.get("serverKey"),
604                         env = env,
605                         tty = tty,
606                         forward_x11 = forward_x11,
607                         timeout = timeout,
608                         retry = retry,
609                         err_on_timeout = err_on_timeout,
610                         connect_timeout = connect_timeout,
611                         persistent = persistent,
612                         blocking = blocking, 
613                         strict_host_checking = strict_host_checking
614                         )
615             else:
616                 (out, err), proc = sshfuncs.rexec(
617                     command, 
618                     host = self.get("hostname"),
619                     user = self.get("username"),
620                     port = self.get("port"),
621                     agent = True,
622                     sudo = sudo,
623                     stdin = stdin,
624                     identity = self.get("identity"),
625                     server_key = self.get("serverKey"),
626                     env = env,
627                     tty = tty,
628                     forward_x11 = forward_x11,
629                     timeout = timeout,
630                     retry = retry,
631                     err_on_timeout = err_on_timeout,
632                     connect_timeout = connect_timeout,
633                     persistent = persistent,
634                     blocking = blocking, 
635                     strict_host_checking = strict_host_checking
636                     )
637
638         return (out, err), proc
639
640     def run(self, command, home,
641             create_home = False,
642             pidfile = 'pidfile',
643             stdin = None, 
644             stdout = 'stdout', 
645             stderr = 'stderr', 
646             sudo = False,
647             tty = False):
648         
649         self.debug("Running command '%s'" % command)
650         
651         if self.localhost:
652             (out, err), proc = execfuncs.lspawn(command, pidfile, 
653                     stdout = stdout, 
654                     stderr = stderr, 
655                     stdin = stdin, 
656                     home = home, 
657                     create_home = create_home, 
658                     sudo = sudo,
659                     user = user) 
660         else:
661             with self._lock:
662                 (out, err), proc = sshfuncs.rspawn(
663                     command,
664                     pidfile = pidfile,
665                     home = home,
666                     create_home = create_home,
667                     stdin = stdin if stdin is not None else '/dev/null',
668                     stdout = stdout if stdout else '/dev/null',
669                     stderr = stderr if stderr else '/dev/null',
670                     sudo = sudo,
671                     host = self.get("hostname"),
672                     user = self.get("username"),
673                     port = self.get("port"),
674                     agent = True,
675                     identity = self.get("identity"),
676                     server_key = self.get("serverKey"),
677                     tty = tty
678                     )
679
680         return (out, err), proc
681
682     def getpid(self, home, pidfile = "pidfile"):
683         if self.localhost:
684             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
685         else:
686             with self._lock:
687                 pidtuple = sshfuncs.rgetpid(
688                     os.path.join(home, pidfile),
689                     host = self.get("hostname"),
690                     user = self.get("username"),
691                     port = self.get("port"),
692                     agent = True,
693                     identity = self.get("identity"),
694                     server_key = self.get("serverKey")
695                     )
696         
697         return pidtuple
698
699     def status(self, pid, ppid):
700         if self.localhost:
701             status = execfuncs.lstatus(pid, ppid)
702         else:
703             with self._lock:
704                 status = sshfuncs.rstatus(
705                         pid, ppid,
706                         host = self.get("hostname"),
707                         user = self.get("username"),
708                         port = self.get("port"),
709                         agent = True,
710                         identity = self.get("identity"),
711                         server_key = self.get("serverKey")
712                         )
713            
714         return status
715     
716     def kill(self, pid, ppid, sudo = False):
717         out = err = ""
718         proc = None
719         status = self.status(pid, ppid)
720
721         if status == sshfuncs.ProcStatus.RUNNING:
722             if self.localhost:
723                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
724             else:
725                 with self._lock:
726                     (out, err), proc = sshfuncs.rkill(
727                         pid, ppid,
728                         host = self.get("hostname"),
729                         user = self.get("username"),
730                         port = self.get("port"),
731                         agent = True,
732                         sudo = sudo,
733                         identity = self.get("identity"),
734                         server_key = self.get("serverKey")
735                         )
736
737         return (out, err), proc
738