Adding PlanetLab resources
[nepi.git] / src / nepi / resources / linux / node.py
1 """
2     NEPI, a framework to manage network experiments
3     Copyright (C) 2013 INRIA
4
5     This program is free software: you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation, either version 3 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 """
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs 
24
25 import collections
26 import logging
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33
34 # TODO: Verify files and dirs exists already
35 # TODO: Blacklist nodes!
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 reschedule_delay = "0.5s"
40
41
42 @clsinit
43 class LinuxNode(ResourceManager):
44     _rtype = "LinuxNode"
45
46     @classmethod
47     def _register_attributes(cls):
48         hostname = Attribute("hostname", "Hostname of the machine",
49                 flags = Flags.ExecReadOnly)
50
51         username = Attribute("username", "Local account username", 
52                 flags = Flags.Credential)
53
54         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
55         
56         home = Attribute("home",
57                 "Experiment home directory to store all experiment related files",
58                 flags = Flags.ExecReadOnly)
59         
60         identity = Attribute("identity", "SSH identity file",
61                 flags = Flags.Credential)
62         
63         server_key = Attribute("serverKey", "Server public key", 
64                 flags = Flags.ExecReadOnly)
65         
66         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
67                 " from home folder before starting experiment", 
68                 flags = Flags.ExecReadOnly)
69         
70         clean_processes = Attribute("cleanProcesses", 
71                 "Kill all running processes before starting experiment",
72                 flags = Flags.ExecReadOnly)
73         
74         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
75                 "releasing the resource",
76                 flags = Flags.ExecReadOnly)
77
78         cls._register_attribute(hostname)
79         cls._register_attribute(username)
80         cls._register_attribute(port)
81         cls._register_attribute(home)
82         cls._register_attribute(identity)
83         cls._register_attribute(server_key)
84         cls._register_attribute(clean_home)
85         cls._register_attribute(clean_processes)
86         cls._register_attribute(tear_down)
87
88     def __init__(self, ec, guid):
89         super(LinuxNode, self).__init__(ec, guid)
90         self._os = None
91         
92         # lock to avoid concurrency issues on methods used by applications 
93         self._lock = threading.Lock()
94
95         self._logger = logging.getLogger("LinuxNode")
96     
97     def log_message(self, msg):
98         return " guid %d - host %s - %s " % (self.guid, 
99                 self.get("hostname"), msg)
100
101     @property
102     def home(self):
103         return self.get("home") or ""
104
105     @property
106     def exp_home(self):
107         return os.path.join(self.home, self.ec.exp_id)
108
109     @property
110     def node_home(self):
111         node_home = "node-%d" % self.guid
112         return os.path.join(self.exp_home, node_home)
113
114     @property
115     def os(self):
116         if self._os:
117             return self._os
118
119         if (not self.get("hostname") or not self.get("username")):
120             msg = "Can't resolve OS, insufficient data "
121             self.error(msg)
122             raise RuntimeError, msg
123
124         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
125
126         if err and proc.poll():
127             msg = "Error detecting OS "
128             self.error(msg, out, err)
129             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
130
131         if out.find("Fedora release 12") == 0:
132             self._os = "f12"
133         elif out.find("Fedora release 14") == 0:
134             self._os = "f14"
135         elif out.find("Debian") == 0: 
136             self._os = "debian"
137         elif out.find("Ubuntu") ==0:
138             self._os = "ubuntu"
139         else:
140             msg = "Unsupported OS"
141             self.error(msg, out)
142             raise RuntimeError, "%s - %s " %( msg, out )
143
144         return self._os
145
146     @property
147     def localhost(self):
148         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
149
150     def provision(self):
151         if not self.is_alive():
152             self._state = ResourceState.FAILED
153             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
154             self.error(msg)
155             raise RuntimeError, msg
156
157         if self.get("cleanProcesses"):
158             self.clean_processes()
159
160         if self.get("cleanHome"):
161             self.clean_home()
162        
163         self.mkdir(self.node_home)
164
165         super(LinuxNode, self).provision()
166
167     def deploy(self):
168         if self.state == ResourceState.NEW:
169             try:
170                self.discover()
171                self.provision()
172             except:
173                 self._state = ResourceState.FAILED
174                 raise
175
176         # Node needs to wait until all associated interfaces are 
177         # ready before it can finalize deployment
178         from nepi.resources.linux.interface import LinuxInterface
179         ifaces = self.get_connected(LinuxInterface.rtype())
180         for iface in ifaces:
181             if iface.state < ResourceState.READY:
182                 self.ec.schedule(reschedule_delay, self.deploy)
183                 return 
184
185         super(LinuxNode, self).deploy()
186
187     def release(self):
188         tear_down = self.get("tearDown")
189         if tear_down:
190             self.execute(tear_down)
191
192         super(LinuxNode, self).release()
193
194     def valid_connection(self, guid):
195         # TODO: Validate!
196         return True
197
198     def clean_processes(self, killer = False):
199         self.info("Cleaning up processes")
200         
201         if killer:
202             # Hardcore kill
203             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
204                 "sudo -S killall python tcpdump || /bin/true ; " +
205                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
206                 "sudo -S killall -u root || /bin/true ; " +
207                 "sudo -S killall -u root || /bin/true ; ")
208         else:
209             # Be gentler...
210             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
211                 "sudo -S killall tcpdump || /bin/true ; " +
212                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
213                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
214
215         out = err = ""
216         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
217             
218     def clean_home(self):
219         self.info("Cleaning up home")
220         
221         cmd = (
222             # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
223             "find . -maxdepth 1 -name 'nepi-*' " +
224             " -execdir rm -rf {} + "
225             )
226             
227         if self.home:
228             cmd = "cd %s ; " % self.home + cmd
229
230         out = err = ""
231         (out, err), proc = self.execute(cmd, with_lock = True)
232
233     def upload(self, src, dst, text = False):
234         """ Copy content to destination
235
236            src  content to copy. Can be a local file, directory or a list of files
237
238            dst  destination path on the remote host (remote is always self.host)
239
240            text src is text input, it must be stored into a temp file before uploading
241         """
242         # If source is a string input 
243         f = None
244         if text and not os.path.isfile(src):
245             # src is text input that should be uploaded as file
246             # create a temporal file with the content to upload
247             f = tempfile.NamedTemporaryFile(delete=False)
248             f.write(src)
249             f.close()
250             src = f.name
251
252         if not self.localhost:
253             # Build destination as <user>@<server>:<path>
254             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
255
256         result = self.copy(src, dst)
257
258         # clean up temp file
259         if f:
260             os.remove(f.name)
261
262         return result
263
264     def download(self, src, dst):
265         if not self.localhost:
266             # Build destination as <user>@<server>:<path>
267             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
268         return self.copy(src, dst)
269
270     def install_packages(self, packages, home = None):
271         home = home or self.node_home
272
273         cmd = ""
274         if self.os in ["f12", "f14"]:
275             cmd = rpmfuncs.install_packages_command(self.os, packages)
276         elif self.os in ["debian", "ubuntu"]:
277             cmd = debfuncs.install_packages_command(self.os, packages)
278         else:
279             msg = "Error installing packages ( OS not known ) "
280             self.error(msg, self.os)
281             raise RuntimeError, msg
282
283         out = err = ""
284         (out, err), proc = self.run_and_wait(cmd, home, 
285             pidfile = "instpkg_pid",
286             stdout = "instpkg_out", 
287             stderr = "instpkg_err",
288             raise_on_error = True)
289
290         return (out, err), proc 
291
292     def remove_packages(self, packages, home = None):
293         home = home or self.node_home
294
295         cmd = ""
296         if self.os in ["f12", "f14"]:
297             cmd = rpmfuncs.remove_packages_command(self.os, packages)
298         elif self.os in ["debian", "ubuntu"]:
299             cmd = debfuncs.remove_packages_command(self.os, packages)
300         else:
301             msg = "Error removing packages ( OS not known ) "
302             self.error(msg)
303             raise RuntimeError, msg
304
305         out = err = ""
306         (out, err), proc = self.run_and_wait(cmd, home, 
307             pidfile = "rmpkg_pid",
308             stdout = "rmpkg_out", 
309             stderr = "rmpkg_err",
310             raise_on_error = True)
311          
312         return (out, err), proc 
313
314     def mkdir(self, path, clean = False):
315         if clean:
316             self.rmdir(path)
317
318         return self.execute("mkdir -p %s" % path, with_lock = True)
319
320     def rmdir(self, path):
321         return self.execute("rm -rf %s" % path, with_lock = True)
322
323     def run_and_wait(self, command, 
324             home = ".", 
325             pidfile = "pid", 
326             stdin = None, 
327             stdout = 'stdout', 
328             stderr = 'stderr', 
329             sudo = False,
330             tty = False,
331             raise_on_error = False):
332         """ runs a command in background on the remote host, but waits
333             until the command finishes execution.
334             This is more robust than doing a simple synchronized 'execute',
335             since in the remote host the command can continue to run detached
336             even if network disconnections occur
337         """
338         # run command in background in remote host
339         (out, err), proc = self.run(command, home, 
340                 pidfile = pidfile,
341                 stdin = stdin, 
342                 stdout = stdout, 
343                 stderr = stderr, 
344                 sudo = sudo,
345                 tty = tty)
346
347         # check no errors occurred
348         if proc.poll() and err:
349             msg = " Failed to run command '%s' " % command
350             self.error(msg, out, err)
351             if raise_on_error:
352                 raise RuntimeError, msg
353
354         # Wait for pid file to be generated
355         pid, ppid = self.wait_pid(
356                 home = home, 
357                 pidfile = pidfile, 
358                 raise_on_error = raise_on_error)
359
360         # wait until command finishes to execute
361         self.wait_run(pid, ppid)
362        
363         # check if execution errors occurred
364         (out, err), proc = self.check_output(home, stderr)
365
366         if err or out:
367             msg = " Failed to run command '%s' " % command
368             self.error(msg, out, err)
369
370             if raise_on_error:
371                 raise RuntimeError, msg
372         
373         return (out, err), proc
374  
375     def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
376         """ Waits until the pid file for the command is generated, 
377             and returns the pid and ppid of the process """
378         pid = ppid = None
379         delay = 1.0
380         for i in xrange(5):
381             pidtuple = self.checkpid(home = home, pidfile = pidfile)
382             
383             if pidtuple:
384                 pid, ppid = pidtuple
385                 break
386             else:
387                 time.sleep(delay)
388                 delay = min(30,delay*1.2)
389         else:
390             msg = " Failed to get pid for pidfile %s/%s " % (
391                     home, pidfile )
392             self.error(msg)
393             
394             if raise_on_error:
395                 raise RuntimeError, msg
396
397         return pid, ppid
398
399     def wait_run(self, pid, ppid, trial = 0):
400         """ wait for a remote process to finish execution """
401         delay = 1.0
402         first = True
403         bustspin = 0
404
405         while True:
406             status = self.status(pid, ppid)
407             
408             if status is sshfuncs.FINISHED:
409                 break
410             elif status is not sshfuncs.RUNNING:
411                 bustspin += 1
412                 time.sleep(delay*(5.5+random.random()))
413                 if bustspin > 12:
414                     break
415             else:
416                 if first:
417                     first = False
418
419                 time.sleep(delay*(0.5+random.random()))
420                 delay = min(30,delay*1.2)
421                 bustspin = 0
422
423     def check_output(self, home, filename):
424         """ checks file content """
425         (out, err), proc = self.execute("cat %s" % 
426             os.path.join(home, filename), retry = 1, with_lock = True)
427         return (out, err), proc
428
429     def is_alive(self):
430         if self.localhost:
431             return True
432
433         out = err = ""
434         try:
435             # TODO: FIX NOT ALIVE!!!!
436             (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
437                     with_lock = True)
438         except:
439             import traceback
440             trace = traceback.format_exc()
441             msg = "Unresponsive host  %s " % err
442             self.error(msg, out, trace)
443             return False
444
445         if out.strip().startswith('ALIVE'):
446             return True
447         else:
448             msg = "Unresponsive host "
449             self.error(msg, out, err)
450             return False
451
452     def copy(self, src, dst):
453         if self.localhost:
454             (out, err), proc =  execfuncs.lcopy(source, dest, 
455                     recursive = True,
456                     strict_host_checking = False)
457         else:
458             with self._lock:
459                 (out, err), proc = sshfuncs.rcopy(
460                     src, dst, 
461                     port = self.get("port"),
462                     identity = self.get("identity"),
463                     server_key = self.get("serverKey"),
464                     recursive = True,
465                     strict_host_checking = False)
466
467         return (out, err), proc
468
469     def execute(self, command,
470             sudo = False,
471             stdin = None, 
472             env = None,
473             tty = False,
474             forward_x11 = False,
475             timeout = None,
476             retry = 3,
477             err_on_timeout = True,
478             connect_timeout = 30,
479             strict_host_checking = False,
480             persistent = True,
481             with_lock = False
482             ):
483         """ Notice that this invocation will block until the
484         execution finishes. If this is not the desired behavior,
485         use 'run' instead."""
486
487         if self.localhost:
488             (out, err), proc = execfuncs.lexec(command, 
489                     user = user,
490                     sudo = sudo,
491                     stdin = stdin,
492                     env = env)
493         else:
494             if with_lock:
495                 with self._lock:
496                     (out, err), proc = sshfuncs.rexec(
497                         command, 
498                         host = self.get("hostname"),
499                         user = self.get("username"),
500                         port = self.get("port"),
501                         agent = True,
502                         sudo = sudo,
503                         stdin = stdin,
504                         identity = self.get("identity"),
505                         server_key = self.get("serverKey"),
506                         env = env,
507                         tty = tty,
508                         forward_x11 = forward_x11,
509                         timeout = timeout,
510                         retry = retry,
511                         err_on_timeout = err_on_timeout,
512                         connect_timeout = connect_timeout,
513                         persistent = persistent,
514                         strict_host_checking = strict_host_checking
515                         )
516             else:
517                 (out, err), proc = sshfuncs.rexec(
518                     command, 
519                     host = self.get("hostname"),
520                     user = self.get("username"),
521                     port = self.get("port"),
522                     agent = True,
523                     sudo = sudo,
524                     stdin = stdin,
525                     identity = self.get("identity"),
526                     server_key = self.get("serverKey"),
527                     env = env,
528                     tty = tty,
529                     forward_x11 = forward_x11,
530                     timeout = timeout,
531                     retry = retry,
532                     err_on_timeout = err_on_timeout,
533                     connect_timeout = connect_timeout,
534                     persistent = persistent
535                     )
536
537         return (out, err), proc
538
539     def run(self, command, 
540             home = None,
541             create_home = False,
542             pidfile = "pid",
543             stdin = None, 
544             stdout = 'stdout', 
545             stderr = 'stderr', 
546             sudo = False,
547             tty = False):
548
549         self.debug("Running command '%s'" % command)
550         
551         if self.localhost:
552             (out, err), proc = execfuncs.lspawn(command, pidfile, 
553                     stdout = stdout, 
554                     stderr = stderr, 
555                     stdin = stdin, 
556                     home = home, 
557                     create_home = create_home, 
558                     sudo = sudo,
559                     user = user) 
560         else:
561             # Start process in a "daemonized" way, using nohup and heavy
562             # stdin/out redirection to avoid connection issues
563             with self._lock:
564                 (out,err), proc = sshfuncs.rspawn(
565                     command,
566                     pidfile = pidfile,
567                     home = home,
568                     create_home = create_home,
569                     stdin = stdin if stdin is not None else '/dev/null',
570                     stdout = stdout if stdout else '/dev/null',
571                     stderr = stderr if stderr else '/dev/null',
572                     sudo = sudo,
573                     host = self.get("hostname"),
574                     user = self.get("username"),
575                     port = self.get("port"),
576                     agent = True,
577                     identity = self.get("identity"),
578                     server_key = self.get("serverKey"),
579                     tty = tty
580                     )
581
582         return (out, err), proc
583
584     def checkpid(self, home = ".", pidfile = "pid"):
585         if self.localhost:
586             pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
587         else:
588             with self._lock:
589                 pidtuple = sshfuncs.rcheckpid(
590                     os.path.join(home, pidfile),
591                     host = self.get("hostname"),
592                     user = self.get("username"),
593                     port = self.get("port"),
594                     agent = True,
595                     identity = self.get("identity"),
596                     server_key = self.get("serverKey")
597                     )
598         
599         return pidtuple
600     
601     def status(self, pid, ppid):
602         if self.localhost:
603             status = execfuncs.lstatus(pid, ppid)
604         else:
605             with self._lock:
606                 status = sshfuncs.rstatus(
607                         pid, ppid,
608                         host = self.get("hostname"),
609                         user = self.get("username"),
610                         port = self.get("port"),
611                         agent = True,
612                         identity = self.get("identity"),
613                         server_key = self.get("serverKey")
614                         )
615            
616         return status
617     
618     def kill(self, pid, ppid, sudo = False):
619         out = err = ""
620         proc = None
621         status = self.status(pid, ppid)
622
623         if status == sshfuncs.RUNNING:
624             if self.localhost:
625                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
626             else:
627                 with self._lock:
628                     (out, err), proc = sshfuncs.rkill(
629                         pid, ppid,
630                         host = self.get("hostname"),
631                         user = self.get("username"),
632                         port = self.get("port"),
633                         agent = True,
634                         sudo = sudo,
635                         identity = self.get("identity"),
636                         server_key = self.get("serverKey")
637                         )
638         return (out, err), proc
639
640     def check_bad_host(self, out, err):
641         badre = re.compile(r'(?:'
642                            r'|Error: disk I/O error'
643                            r')', 
644                            re.I)
645         return badre.search(out) or badre.search(err)
646