Added unit tests for linux application
[nepi.git] / src / neco / resources / linux / node.py
1 from neco.execution.attribute import Attribute, Flags
2 from neco.execution.resource import ResourceManager, clsinit, ResourceState
3 from neco.resources.linux import rpmfuncs, debfuncs 
4 from neco.util import sshfuncs, execfuncs 
5
6 import collections
7 import logging
8 import os
9 import random
10 import re
11 import tempfile
12 import time
13 import threading
14
15 # TODO: Verify files and dirs exists already
16 # TODO: Blacklist nodes!
17 # TODO: Unify delays!!
18
19 reschedule_delay = "0.5s"
20
21 @clsinit
22 class LinuxNode(ResourceManager):
23     _rtype = "LinuxNode"
24
25     @classmethod
26     def _register_attributes(cls):
27         hostname = Attribute("hostname", "Hostname of the machine",
28                 flags = Flags.ExecReadOnly)
29
30         username = Attribute("username", "Local account username", 
31                 flags = Flags.Credential)
32
33         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
34         
35         home = Attribute("home",
36                 "Experiment home directory to store all experiment related files",
37                 flags = Flags.ExecReadOnly)
38         
39         identity = Attribute("identity", "SSH identity file",
40                 flags = Flags.Credential)
41         
42         server_key = Attribute("serverKey", "Server public key", 
43                 flags = Flags.ExecReadOnly)
44         
45         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
46                 " from home folder before starting experiment", 
47                 flags = Flags.ExecReadOnly)
48         
49         clean_processes = Attribute("cleanProcesses", 
50                 "Kill all running processes before starting experiment",
51                 flags = Flags.ExecReadOnly)
52         
53         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
54                 "releasing the resource",
55                 flags = Flags.ExecReadOnly)
56
57         cls._register_attribute(hostname)
58         cls._register_attribute(username)
59         cls._register_attribute(port)
60         cls._register_attribute(home)
61         cls._register_attribute(identity)
62         cls._register_attribute(server_key)
63         cls._register_attribute(clean_home)
64         cls._register_attribute(clean_processes)
65         cls._register_attribute(tear_down)
66
67     def __init__(self, ec, guid):
68         super(LinuxNode, self).__init__(ec, guid)
69         self._os = None
70         
71         # lock to avoid concurrency issues on methods used by applications 
72         self._lock = threading.Lock()
73
74         self._logger = logging.getLogger("LinuxNode")
75     
76     def log_message(self, msg):
77         return " guid %d - host %s - %s " % (self.guid, 
78                 self.get("hostname"), msg)
79
80     @property
81     def home(self):
82         return self.get("home") or "/tmp"
83
84     @property
85     def exp_dir(self):
86         exp_dir = os.path.join(self.home, self.ec.exp_id)
87         return exp_dir if exp_dir.startswith('/') or \
88                 exp_dir.startswith("~/") else "~/"
89
90     @property
91     def node_home(self):
92         node_home = "node-%d" % self.guid
93         return os.path.join(self.exp_dir, node_home)
94
95     @property
96     def os(self):
97         if self._os:
98             return self._os
99
100         if (not self.get("hostname") or not self.get("username")):
101             msg = "Can't resolve OS, insufficient data "
102             self.error(msg)
103             raise RuntimeError, msg
104
105         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
106
107         if err and proc.poll():
108             msg = "Error detecting OS "
109             self.error(msg, out, err)
110             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
111
112         if out.find("Fedora release 12") == 0:
113             self._os = "f12"
114         elif out.find("Fedora release 14") == 0:
115             self._os = "f14"
116         elif out.find("Debian") == 0: 
117             self._os = "debian"
118         elif out.find("Ubuntu") ==0:
119             self._os = "ubuntu"
120         else:
121             msg = "Unsupported OS"
122             self.error(msg, out)
123             raise RuntimeError, "%s - %s " %( msg, out )
124
125         return self._os
126
127     @property
128     def localhost(self):
129         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
130
131     def provision(self, filters = None):
132         if not self.is_alive():
133             self._state = ResourceState.FAILED
134             self.error("Deploy failed. Unresponsive node")
135             return
136
137         if self.get("cleanProcesses"):
138             self.clean_processes()
139
140         if self.get("cleanHome"):
141             self.clean_home()
142        
143         self.mkdir(self.node_home)
144
145         super(LinuxNode, self).provision()
146
147     def deploy(self):
148         if self.state == ResourceState.NEW:
149             try:
150                self.discover()
151                self.provision()
152             except:
153                 self._state = ResourceState.FAILED
154                 raise
155
156         # Node needs to wait until all associated interfaces are 
157         # ready before it can finalize deployment
158         from neco.resources.linux.interface import LinuxInterface
159         ifaces = self.get_connected(LinuxInterface.rtype())
160         for iface in ifaces:
161             if iface.state < ResourceState.READY:
162                 self.ec.schedule(reschedule_delay, self.deploy)
163                 return 
164
165         super(LinuxNode, self).deploy()
166
167     def release(self):
168         tear_down = self.get("tearDown")
169         if tear_down:
170             self.execute(tear_down)
171
172         super(LinuxNode, self).release()
173
174     def valid_connection(self, guid):
175         # TODO: Validate!
176         return True
177
178     def clean_processes(self, killer = False):
179         self.info("Cleaning up processes")
180         
181         if killer:
182             # Hardcore kill
183             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
184                 "sudo -S killall python tcpdump || /bin/true ; " +
185                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
186                 "sudo -S killall -u root || /bin/true ; " +
187                 "sudo -S killall -u root || /bin/true ; ")
188         else:
189             # Be gentler...
190             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
191                 "sudo -S killall tcpdump || /bin/true ; " +
192                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
193                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
194
195         out = err = ""
196         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
197             
198     def clean_home(self):
199         self.info("Cleaning up home")
200
201         cmd = ("cd %s ; " % self.home +
202             "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
203             " -execdir rm -rf {} + ")
204
205         out = err = ""
206         (out, err), proc = self.execute(cmd, with_lock = True)
207
208     def upload(self, src, dst, text = False):
209         """ Copy content to destination
210
211            src  content to copy. Can be a local file, directory or a list of files
212
213            dst  destination path on the remote host (remote is always self.host)
214
215            text src is text input, it must be stored into a temp file before uploading
216         """
217         # If source is a string input 
218         f = None
219         if text and not os.path.isfile(src):
220             # src is text input that should be uploaded as file
221             # create a temporal file with the content to upload
222             f = tempfile.NamedTemporaryFile(delete=False)
223             f.write(src)
224             f.close()
225             src = f.name
226
227         if not self.localhost:
228             # Build destination as <user>@<server>:<path>
229             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
230
231         result = self.copy(src, dst)
232
233         # clean up temp file
234         if f:
235             os.remove(f.name)
236
237         return result
238
239     def download(self, src, dst):
240         if not self.localhost:
241             # Build destination as <user>@<server>:<path>
242             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
243         return self.copy(src, dst)
244
245     def install_packages(self, packages, home = None):
246         home = home or self.node_home
247
248         cmd = ""
249         if self.os in ["f12", "f14"]:
250             cmd = rpmfuncs.install_packages_command(self.os, packages)
251         elif self.os in ["debian", "ubuntu"]:
252             cmd = debfuncs.install_packages_command(self.os, packages)
253         else:
254             msg = "Error installing packages ( OS not known ) "
255             self.error(msg, self.os)
256             raise RuntimeError, msg
257
258         out = err = ""
259         (out, err), proc = self.run_and_wait(cmd, home, 
260             pidfile = "instpkg_pid",
261             stdout = "instpkg_out", 
262             stderr = "instpkg_err",
263             raise_on_error = True)
264
265         return (out, err), proc 
266
267     def remove_packages(self, packages, home = None):
268         home = home or self.node_home
269
270         cmd = ""
271         if self.os in ["f12", "f14"]:
272             cmd = rpmfuncs.remove_packages_command(self.os, packages)
273         elif self.os in ["debian", "ubuntu"]:
274             cmd = debfuncs.remove_packages_command(self.os, packages)
275         else:
276             msg = "Error removing packages ( OS not known ) "
277             self.error(msg)
278             raise RuntimeError, msg
279
280         out = err = ""
281         (out, err), proc = self.run_and_wait(cmd, home, 
282             pidfile = "rmpkg_pid",
283             stdout = "rmpkg_out", 
284             stderr = "rmpkg_err",
285             raise_on_error = True)
286          
287         return (out, err), proc 
288
289     def mkdir(self, path, clean = False):
290         if clean:
291             self.rmdir(path)
292
293         return self.execute("mkdir -p %s" % path, with_lock = True)
294
295     def rmdir(self, path):
296         return self.execute("rm -rf %s" % path, with_lock = True)
297
298     def run_and_wait(self, command, 
299             home = ".", 
300             pidfile = "pid", 
301             stdin = None, 
302             stdout = 'stdout', 
303             stderr = 'stderr', 
304             sudo = False,
305             tty = False,
306             raise_on_error = False):
307         """ runs a command in background on the remote host, but waits
308             until the command finishes execution.
309             This is more robust than doing a simple synchronized 'execute',
310             since in the remote host the command can continue to run detached
311             even if network disconnections occur
312         """
313         # run command in background in remote host
314         (out, err), proc = self.run(command, home, 
315                 pidfile = pidfile,
316                 stdin = stdin, 
317                 stdout = stdout, 
318                 stderr = stderr, 
319                 sudo = sudo,
320                 tty = tty)
321
322         # check no errors occurred
323         if proc.poll() and err:
324             msg = " Failed to run command '%s' " % command
325             self.error(msg, out, err)
326             if raise_on_error:
327                 raise RuntimeError, msg
328
329         # Wait for pid file to be generated
330         pid, ppid = self.wait_pid(
331                 home = home, 
332                 pidfile = pidfile, 
333                 raise_on_error = raise_on_error)
334
335         # wait until command finishes to execute
336         self.wait_run(pid, ppid)
337        
338         # check if execution errors occurred
339         (out, err), proc = self.check_output(home, stderr)
340
341         if err or out:
342             msg = " Failed to run command '%s' " % command
343             self.error(msg, out, err)
344
345             if raise_on_error:
346                 raise RuntimeError, msg
347         
348         return (out, err), proc
349  
350     def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
351         """ Waits until the pid file for the command is generated, 
352             and returns the pid and ppid of the process """
353         pid = ppid = None
354         delay = 1.0
355         for i in xrange(5):
356             pidtuple = self.checkpid(home = home, pidfile = pidfile)
357             
358             if pidtuple:
359                 pid, ppid = pidtuple
360                 break
361             else:
362                 time.sleep(delay)
363                 delay = min(30,delay*1.2)
364         else:
365             msg = " Failed to get pid for pidfile %s/%s " % (
366                     home, pidfile )
367             self.error(msg)
368             
369             if raise_on_error:
370                 raise RuntimeError, msg
371
372         return pid, ppid
373
374     def wait_run(self, pid, ppid, trial = 0):
375         """ wait for a remote process to finish execution """
376         delay = 1.0
377         first = True
378         bustspin = 0
379
380         while True:
381             status = self.status(pid, ppid)
382             
383             if status is sshfuncs.FINISHED:
384                 break
385             elif status is not sshfuncs.RUNNING:
386                 bustspin += 1
387                 time.sleep(delay*(5.5+random.random()))
388                 if bustspin > 12:
389                     break
390             else:
391                 if first:
392                     first = False
393
394                 time.sleep(delay*(0.5+random.random()))
395                 delay = min(30,delay*1.2)
396                 bustspin = 0
397
398     def check_output(self, home, filename):
399         """ checks file content """
400         (out, err), proc = self.execute("cat %s" % 
401             os.path.join(home, filename), retry = 1, with_lock = True)
402         return (out, err), proc
403
404     def is_alive(self):
405         if self.localhost:
406             return True
407
408         out = err = ""
409         try:
410             (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True)
411         except:
412             import traceback
413             trace = traceback.format_exc()
414             msg = "Unresponsive host "
415             self.warn(msg, out, trace)
416             return False
417
418         if out.strip().startswith('ALIVE'):
419             return True
420         else:
421             msg = "Unresponsive host "
422             self.warn(msg, out, err)
423             return False
424
425             # TODO!
426             #if self.check_bad_host(out,err):
427             #    self.blacklist()
428
429     def copy(self, src, dst):
430         if self.localhost:
431             (out, err), proc =  execfuncs.lcopy(source, dest, 
432                     recursive = True)
433         else:
434             with self._lock:
435                 (out, err), proc = sshfuncs.rcopy(
436                     src, dst, 
437                     port = self.get("port"),
438                     identity = self.get("identity"),
439                     server_key = self.get("serverKey"),
440                     recursive = True)
441
442         return (out, err), proc
443
444     def execute(self, command,
445             sudo = False,
446             stdin = None, 
447             env = None,
448             tty = False,
449             forward_x11 = False,
450             timeout = None,
451             retry = 3,
452             err_on_timeout = True,
453             connect_timeout = 30,
454             persistent = True,
455             with_lock = False
456             ):
457         """ Notice that this invocation will block until the
458         execution finishes. If this is not the desired behavior,
459         use 'run' instead."""
460
461         if self.localhost:
462             (out, err), proc = execfuncs.lexec(command, 
463                     user = user,
464                     sudo = sudo,
465                     stdin = stdin,
466                     env = env)
467         else:
468             if with_lock:
469                 with self._lock:
470                     (out, err), proc = sshfuncs.rexec(
471                         command, 
472                         host = self.get("hostname"),
473                         user = self.get("username"),
474                         port = self.get("port"),
475                         agent = True,
476                         sudo = sudo,
477                         stdin = stdin,
478                         identity = self.get("identity"),
479                         server_key = self.get("serverKey"),
480                         env = env,
481                         tty = tty,
482                         forward_x11 = forward_x11,
483                         timeout = timeout,
484                         retry = retry,
485                         err_on_timeout = err_on_timeout,
486                         connect_timeout = connect_timeout,
487                         persistent = persistent
488                         )
489             else:
490                 (out, err), proc = sshfuncs.rexec(
491                     command, 
492                     host = self.get("hostname"),
493                     user = self.get("username"),
494                     port = self.get("port"),
495                     agent = True,
496                     sudo = sudo,
497                     stdin = stdin,
498                     identity = self.get("identity"),
499                     server_key = self.get("serverKey"),
500                     env = env,
501                     tty = tty,
502                     forward_x11 = forward_x11,
503                     timeout = timeout,
504                     retry = retry,
505                     err_on_timeout = err_on_timeout,
506                     connect_timeout = connect_timeout,
507                     persistent = persistent
508                     )
509
510         return (out, err), proc
511
512     def run(self, command, 
513             home = None,
514             create_home = True,
515             pidfile = "pid",
516             stdin = None, 
517             stdout = 'stdout', 
518             stderr = 'stderr', 
519             sudo = False,
520             tty = False):
521
522         self.debug("Running command '%s'" % command)
523         
524         if self.localhost:
525             (out, err), proc = execfuncs.lspawn(command, pidfile, 
526                     stdout = stdout, 
527                     stderr = stderr, 
528                     stdin = stdin, 
529                     home = home, 
530                     create_home = create_home, 
531                     sudo = sudo,
532                     user = user) 
533         else:
534             # Start process in a "daemonized" way, using nohup and heavy
535             # stdin/out redirection to avoid connection issues
536             with self._lock:
537                 (out,err), proc = sshfuncs.rspawn(
538                     command,
539                     pidfile = pidfile,
540                     home = home,
541                     create_home = create_home,
542                     stdin = stdin if stdin is not None else '/dev/null',
543                     stdout = stdout if stdout else '/dev/null',
544                     stderr = stderr if stderr else '/dev/null',
545                     sudo = sudo,
546                     host = self.get("hostname"),
547                     user = self.get("username"),
548                     port = self.get("port"),
549                     agent = True,
550                     identity = self.get("identity"),
551                     server_key = self.get("serverKey"),
552                     tty = tty
553                     )
554
555         return (out, err), proc
556
557     def checkpid(self, home = ".", pidfile = "pid"):
558         if self.localhost:
559             pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
560         else:
561             with self._lock:
562                 pidtuple = sshfuncs.rcheckpid(
563                     os.path.join(home, pidfile),
564                     host = self.get("hostname"),
565                     user = self.get("username"),
566                     port = self.get("port"),
567                     agent = True,
568                     identity = self.get("identity"),
569                     server_key = self.get("serverKey")
570                     )
571         
572         return pidtuple
573     
574     def status(self, pid, ppid):
575         if self.localhost:
576             status = execfuncs.lstatus(pid, ppid)
577         else:
578             with self._lock:
579                 status = sshfuncs.rstatus(
580                         pid, ppid,
581                         host = self.get("hostname"),
582                         user = self.get("username"),
583                         port = self.get("port"),
584                         agent = True,
585                         identity = self.get("identity"),
586                         server_key = self.get("serverKey")
587                         )
588            
589         return status
590     
591     def kill(self, pid, ppid, sudo = False):
592         out = err = ""
593         proc = None
594         status = self.status(pid, ppid)
595
596         if status == sshfuncs.RUNNING:
597             if self.localhost:
598                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
599             else:
600                 with self._lock:
601                     (out, err), proc = sshfuncs.rkill(
602                         pid, ppid,
603                         host = self.get("hostname"),
604                         user = self.get("username"),
605                         port = self.get("port"),
606                         agent = True,
607                         sudo = sudo,
608                         identity = self.get("identity"),
609                         server_key = self.get("serverKey")
610                         )
611         return (out, err), proc
612
613     def check_bad_host(self, out, err):
614         badre = re.compile(r'(?:'
615                            r'|Error: disk I/O error'
616                            r')', 
617                            re.I)
618         return badre.search(out) or badre.search(err)
619
620     def blacklist(self):
621         # TODO!!!!
622         self.warn(" Blacklisting malfunctioning node ")
623         #import util
624         #util.appendBlacklist(self.hostname)
625