Adding authors and correcting licence information
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs 
24
25 import collections
26 import os
27 import random
28 import re
29 import tempfile
30 import time
31 import threading
32
33 # TODO: Verify files and dirs exists already
34 # TODO: Blacklist nodes!
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!! 
37
38 reschedule_delay = "0.5s"
39
40
41 @clsinit
42 class LinuxNode(ResourceManager):
43     _rtype = "LinuxNode"
44
45     @classmethod
46     def _register_attributes(cls):
47         hostname = Attribute("hostname", "Hostname of the machine",
48                 flags = Flags.ExecReadOnly)
49
50         username = Attribute("username", "Local account username", 
51                 flags = Flags.Credential)
52
53         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
54         
55         home = Attribute("home",
56                 "Experiment home directory to store all experiment related files",
57                 flags = Flags.ExecReadOnly)
58         
59         identity = Attribute("identity", "SSH identity file",
60                 flags = Flags.Credential)
61         
62         server_key = Attribute("serverKey", "Server public key", 
63                 flags = Flags.ExecReadOnly)
64         
65         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
66                 " from home folder before starting experiment", 
67                 flags = Flags.ExecReadOnly)
68         
69         clean_processes = Attribute("cleanProcesses", 
70                 "Kill all running processes before starting experiment",
71                 flags = Flags.ExecReadOnly)
72         
73         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
74                 "releasing the resource",
75                 flags = Flags.ExecReadOnly)
76
77         cls._register_attribute(hostname)
78         cls._register_attribute(username)
79         cls._register_attribute(port)
80         cls._register_attribute(home)
81         cls._register_attribute(identity)
82         cls._register_attribute(server_key)
83         cls._register_attribute(clean_home)
84         cls._register_attribute(clean_processes)
85         cls._register_attribute(tear_down)
86
87     def __init__(self, ec, guid):
88         super(LinuxNode, self).__init__(ec, guid)
89         self._os = None
90         
91         # lock to avoid concurrency issues on methods used by applications 
92         self._lock = threading.Lock()
93     
94     def log_message(self, msg):
95         return " guid %d - host %s - %s " % (self.guid, 
96                 self.get("hostname"), msg)
97
98     @property
99     def home(self):
100         return self.get("home") or ""
101
102     @property
103     def exp_home(self):
104         return os.path.join(self.home, self.ec.exp_id)
105
106     @property
107     def node_home(self):
108         node_home = "node-%d" % self.guid
109         return os.path.join(self.exp_home, node_home)
110
111     @property
112     def os(self):
113         if self._os:
114             return self._os
115
116         if (not self.get("hostname") or not self.get("username")):
117             msg = "Can't resolve OS, insufficient data "
118             self.error(msg)
119             raise RuntimeError, msg
120
121         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
122
123         if err and proc.poll():
124             msg = "Error detecting OS "
125             self.error(msg, out, err)
126             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
127
128         if out.find("Fedora release 12") == 0:
129             self._os = "f12"
130         elif out.find("Fedora release 14") == 0:
131             self._os = "f14"
132         elif out.find("Debian") == 0: 
133             self._os = "debian"
134         elif out.find("Ubuntu") ==0:
135             self._os = "ubuntu"
136         else:
137             msg = "Unsupported OS"
138             self.error(msg, out)
139             raise RuntimeError, "%s - %s " %( msg, out )
140
141         return self._os
142
143     @property
144     def localhost(self):
145         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
146
147     def provision(self):
148         if not self.is_alive():
149             self._state = ResourceState.FAILED
150             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
151             self.error(msg)
152             raise RuntimeError, msg
153
154         if self.get("cleanProcesses"):
155             self.clean_processes()
156
157         if self.get("cleanHome"):
158             self.clean_home()
159        
160         self.mkdir(self.node_home)
161
162         super(LinuxNode, self).provision()
163
164     def deploy(self):
165         if self.state == ResourceState.NEW:
166             try:
167                self.discover()
168                self.provision()
169             except:
170                 self._state = ResourceState.FAILED
171                 raise
172
173         # Node needs to wait until all associated interfaces are 
174         # ready before it can finalize deployment
175         from nepi.resources.linux.interface import LinuxInterface
176         ifaces = self.get_connected(LinuxInterface.rtype())
177         for iface in ifaces:
178             if iface.state < ResourceState.READY:
179                 self.ec.schedule(reschedule_delay, self.deploy)
180                 return 
181
182         super(LinuxNode, self).deploy()
183
184     def release(self):
185         tear_down = self.get("tearDown")
186         if tear_down:
187             self.execute(tear_down)
188
189         super(LinuxNode, self).release()
190
191     def valid_connection(self, guid):
192         # TODO: Validate!
193         return True
194
195     def clean_processes(self, killer = False):
196         self.info("Cleaning up processes")
197         
198         if killer:
199             # Hardcore kill
200             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
201                 "sudo -S killall python tcpdump || /bin/true ; " +
202                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
203                 "sudo -S killall -u root || /bin/true ; " +
204                 "sudo -S killall -u root || /bin/true ; ")
205         else:
206             # Be gentler...
207             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
208                 "sudo -S killall tcpdump || /bin/true ; " +
209                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
210                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
211
212         out = err = ""
213         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
214             
215     def clean_home(self):
216         self.info("Cleaning up home")
217         
218         cmd = (
219             # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
220             "find . -maxdepth 1 -name 'nepi-*' " +
221             " -execdir rm -rf {} + "
222             )
223             
224         if self.home:
225             cmd = "cd %s ; " % self.home + cmd
226
227         out = err = ""
228         (out, err), proc = self.execute(cmd, with_lock = True)
229
230     def upload(self, src, dst, text = False):
231         """ Copy content to destination
232
233            src  content to copy. Can be a local file, directory or a list of files
234
235            dst  destination path on the remote host (remote is always self.host)
236
237            text src is text input, it must be stored into a temp file before uploading
238         """
239         # If source is a string input 
240         f = None
241         if text and not os.path.isfile(src):
242             # src is text input that should be uploaded as file
243             # create a temporal file with the content to upload
244             f = tempfile.NamedTemporaryFile(delete=False)
245             f.write(src)
246             f.close()
247             src = f.name
248
249         if not self.localhost:
250             # Build destination as <user>@<server>:<path>
251             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
252
253         result = self.copy(src, dst)
254
255         # clean up temp file
256         if f:
257             os.remove(f.name)
258
259         return result
260
261     def download(self, src, dst):
262         if not self.localhost:
263             # Build destination as <user>@<server>:<path>
264             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
265         return self.copy(src, dst)
266
267     def install_packages(self, packages, home = None):
268         home = home or self.node_home
269
270         cmd = ""
271         if self.os in ["f12", "f14"]:
272             cmd = rpmfuncs.install_packages_command(self.os, packages)
273         elif self.os in ["debian", "ubuntu"]:
274             cmd = debfuncs.install_packages_command(self.os, packages)
275         else:
276             msg = "Error installing packages ( OS not known ) "
277             self.error(msg, self.os)
278             raise RuntimeError, msg
279
280         out = err = ""
281         (out, err), proc = self.run_and_wait(cmd, home, 
282             pidfile = "instpkg_pid",
283             stdout = "instpkg_out", 
284             stderr = "instpkg_err",
285             raise_on_error = True)
286
287         return (out, err), proc 
288
289     def remove_packages(self, packages, home = None):
290         home = home or self.node_home
291
292         cmd = ""
293         if self.os in ["f12", "f14"]:
294             cmd = rpmfuncs.remove_packages_command(self.os, packages)
295         elif self.os in ["debian", "ubuntu"]:
296             cmd = debfuncs.remove_packages_command(self.os, packages)
297         else:
298             msg = "Error removing packages ( OS not known ) "
299             self.error(msg)
300             raise RuntimeError, msg
301
302         out = err = ""
303         (out, err), proc = self.run_and_wait(cmd, home, 
304             pidfile = "rmpkg_pid",
305             stdout = "rmpkg_out", 
306             stderr = "rmpkg_err",
307             raise_on_error = True)
308          
309         return (out, err), proc 
310
311     def mkdir(self, path, clean = False):
312         if clean:
313             self.rmdir(path)
314
315         return self.execute("mkdir -p %s" % path, with_lock = True)
316
317     def rmdir(self, path):
318         return self.execute("rm -rf %s" % path, with_lock = True)
319
320     def run_and_wait(self, command, 
321             home = ".", 
322             pidfile = "pid", 
323             stdin = None, 
324             stdout = 'stdout', 
325             stderr = 'stderr', 
326             sudo = False,
327             tty = False,
328             raise_on_error = False):
329         """ runs a command in background on the remote host, but waits
330             until the command finishes execution.
331             This is more robust than doing a simple synchronized 'execute',
332             since in the remote host the command can continue to run detached
333             even if network disconnections occur
334         """
335         # run command in background in remote host
336         (out, err), proc = self.run(command, home, 
337                 pidfile = pidfile,
338                 stdin = stdin, 
339                 stdout = stdout, 
340                 stderr = stderr, 
341                 sudo = sudo,
342                 tty = tty)
343
344         # check no errors occurred
345         if proc.poll() and err:
346             msg = " Failed to run command '%s' " % command
347             self.error(msg, out, err)
348             if raise_on_error:
349                 raise RuntimeError, msg
350
351         # Wait for pid file to be generated
352         pid, ppid = self.wait_pid(
353                 home = home, 
354                 pidfile = pidfile, 
355                 raise_on_error = raise_on_error)
356
357         # wait until command finishes to execute
358         self.wait_run(pid, ppid)
359        
360         # check if execution errors occurred
361         (out, err), proc = self.check_output(home, stderr)
362
363         if err or out:
364             msg = " Failed to run command '%s' " % command
365             self.error(msg, out, err)
366
367             if raise_on_error:
368                 raise RuntimeError, msg
369         
370         return (out, err), proc
371  
372     def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
373         """ Waits until the pid file for the command is generated, 
374             and returns the pid and ppid of the process """
375         pid = ppid = None
376         delay = 1.0
377         for i in xrange(5):
378             pidtuple = self.checkpid(home = home, pidfile = pidfile)
379             
380             if pidtuple:
381                 pid, ppid = pidtuple
382                 break
383             else:
384                 time.sleep(delay)
385                 delay = min(30,delay*1.2)
386         else:
387             msg = " Failed to get pid for pidfile %s/%s " % (
388                     home, pidfile )
389             self.error(msg)
390             
391             if raise_on_error:
392                 raise RuntimeError, msg
393
394         return pid, ppid
395
396     def wait_run(self, pid, ppid, trial = 0):
397         """ wait for a remote process to finish execution """
398         delay = 1.0
399         first = True
400         bustspin = 0
401
402         while True:
403             status = self.status(pid, ppid)
404             
405             if status is sshfuncs.FINISHED:
406                 break
407             elif status is not sshfuncs.RUNNING:
408                 bustspin += 1
409                 time.sleep(delay*(5.5+random.random()))
410                 if bustspin > 12:
411                     break
412             else:
413                 if first:
414                     first = False
415
416                 time.sleep(delay*(0.5+random.random()))
417                 delay = min(30,delay*1.2)
418                 bustspin = 0
419
420     def check_output(self, home, filename):
421         """ checks file content """
422         (out, err), proc = self.execute("cat %s" % 
423             os.path.join(home, filename), retry = 1, with_lock = True)
424         return (out, err), proc
425
426     def is_alive(self):
427         if self.localhost:
428             return True
429
430         out = err = ""
431         try:
432             # TODO: FIX NOT ALIVE!!!!
433             (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
434                     with_lock = True)
435         except:
436             import traceback
437             trace = traceback.format_exc()
438             msg = "Unresponsive host  %s " % err
439             self.error(msg, out, trace)
440             return False
441
442         if out.strip().startswith('ALIVE'):
443             return True
444         else:
445             msg = "Unresponsive host "
446             self.error(msg, out, err)
447             return False
448
449     def copy(self, src, dst):
450         if self.localhost:
451             (out, err), proc =  execfuncs.lcopy(source, dest, 
452                     recursive = True,
453                     strict_host_checking = False)
454         else:
455             with self._lock:
456                 (out, err), proc = sshfuncs.rcopy(
457                     src, dst, 
458                     port = self.get("port"),
459                     identity = self.get("identity"),
460                     server_key = self.get("serverKey"),
461                     recursive = True,
462                     strict_host_checking = False)
463
464         return (out, err), proc
465
466     def execute(self, command,
467             sudo = False,
468             stdin = None, 
469             env = None,
470             tty = False,
471             forward_x11 = False,
472             timeout = None,
473             retry = 3,
474             err_on_timeout = True,
475             connect_timeout = 30,
476             strict_host_checking = False,
477             persistent = True,
478             with_lock = False
479             ):
480         """ Notice that this invocation will block until the
481         execution finishes. If this is not the desired behavior,
482         use 'run' instead."""
483
484         if self.localhost:
485             (out, err), proc = execfuncs.lexec(command, 
486                     user = user,
487                     sudo = sudo,
488                     stdin = stdin,
489                     env = env)
490         else:
491             if with_lock:
492                 with self._lock:
493                     (out, err), proc = sshfuncs.rexec(
494                         command, 
495                         host = self.get("hostname"),
496                         user = self.get("username"),
497                         port = self.get("port"),
498                         agent = True,
499                         sudo = sudo,
500                         stdin = stdin,
501                         identity = self.get("identity"),
502                         server_key = self.get("serverKey"),
503                         env = env,
504                         tty = tty,
505                         forward_x11 = forward_x11,
506                         timeout = timeout,
507                         retry = retry,
508                         err_on_timeout = err_on_timeout,
509                         connect_timeout = connect_timeout,
510                         persistent = persistent,
511                         strict_host_checking = strict_host_checking
512                         )
513             else:
514                 (out, err), proc = sshfuncs.rexec(
515                     command, 
516                     host = self.get("hostname"),
517                     user = self.get("username"),
518                     port = self.get("port"),
519                     agent = True,
520                     sudo = sudo,
521                     stdin = stdin,
522                     identity = self.get("identity"),
523                     server_key = self.get("serverKey"),
524                     env = env,
525                     tty = tty,
526                     forward_x11 = forward_x11,
527                     timeout = timeout,
528                     retry = retry,
529                     err_on_timeout = err_on_timeout,
530                     connect_timeout = connect_timeout,
531                     persistent = persistent
532                     )
533
534         return (out, err), proc
535
536     def run(self, command, 
537             home = None,
538             create_home = False,
539             pidfile = "pid",
540             stdin = None, 
541             stdout = 'stdout', 
542             stderr = 'stderr', 
543             sudo = False,
544             tty = False):
545
546         self.debug("Running command '%s'" % command)
547         
548         if self.localhost:
549             (out, err), proc = execfuncs.lspawn(command, pidfile, 
550                     stdout = stdout, 
551                     stderr = stderr, 
552                     stdin = stdin, 
553                     home = home, 
554                     create_home = create_home, 
555                     sudo = sudo,
556                     user = user) 
557         else:
558             # Start process in a "daemonized" way, using nohup and heavy
559             # stdin/out redirection to avoid connection issues
560             with self._lock:
561                 (out,err), proc = sshfuncs.rspawn(
562                     command,
563                     pidfile = pidfile,
564                     home = home,
565                     create_home = create_home,
566                     stdin = stdin if stdin is not None else '/dev/null',
567                     stdout = stdout if stdout else '/dev/null',
568                     stderr = stderr if stderr else '/dev/null',
569                     sudo = sudo,
570                     host = self.get("hostname"),
571                     user = self.get("username"),
572                     port = self.get("port"),
573                     agent = True,
574                     identity = self.get("identity"),
575                     server_key = self.get("serverKey"),
576                     tty = tty
577                     )
578
579         return (out, err), proc
580
581     def checkpid(self, home = ".", pidfile = "pid"):
582         if self.localhost:
583             pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
584         else:
585             with self._lock:
586                 pidtuple = sshfuncs.rcheckpid(
587                     os.path.join(home, pidfile),
588                     host = self.get("hostname"),
589                     user = self.get("username"),
590                     port = self.get("port"),
591                     agent = True,
592                     identity = self.get("identity"),
593                     server_key = self.get("serverKey")
594                     )
595         
596         return pidtuple
597     
598     def status(self, pid, ppid):
599         if self.localhost:
600             status = execfuncs.lstatus(pid, ppid)
601         else:
602             with self._lock:
603                 status = sshfuncs.rstatus(
604                         pid, ppid,
605                         host = self.get("hostname"),
606                         user = self.get("username"),
607                         port = self.get("port"),
608                         agent = True,
609                         identity = self.get("identity"),
610                         server_key = self.get("serverKey")
611                         )
612            
613         return status
614     
615     def kill(self, pid, ppid, sudo = False):
616         out = err = ""
617         proc = None
618         status = self.status(pid, ppid)
619
620         if status == sshfuncs.RUNNING:
621             if self.localhost:
622                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
623             else:
624                 with self._lock:
625                     (out, err), proc = sshfuncs.rkill(
626                         pid, ppid,
627                         host = self.get("hostname"),
628                         user = self.get("username"),
629                         port = self.get("port"),
630                         agent = True,
631                         sudo = sudo,
632                         identity = self.get("identity"),
633                         server_key = self.get("serverKey")
634                         )
635         return (out, err), proc
636
637     def check_bad_host(self, out, err):
638         badre = re.compile(r'(?:'
639                            r'|Error: disk I/O error'
640                            r')', 
641                            re.I)
642         return badre.search(out) or badre.search(err)
643