Replacing string "neco" by "nepi"
[nepi.git] / src / nepi / resources / linux / node.py
1 from nepi.execution.attribute import Attribute, Flags
2 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
3 from nepi.resources.linux import rpmfuncs, debfuncs 
4 from nepi.util import sshfuncs, execfuncs 
5
6 import collections
7 import logging
8 import os
9 import random
10 import re
11 import tempfile
12 import time
13 import threading
14
15 # TODO: Verify files and dirs exists already
16 # TODO: Blacklist nodes!
17 # TODO: Unify delays!!
18 # TODO: Validate outcome of uploads!! 
19
20 reschedule_delay = "0.5s"
21
22
23 @clsinit
24 class LinuxNode(ResourceManager):
25     _rtype = "LinuxNode"
26
27     @classmethod
28     def _register_attributes(cls):
29         hostname = Attribute("hostname", "Hostname of the machine",
30                 flags = Flags.ExecReadOnly)
31
32         username = Attribute("username", "Local account username", 
33                 flags = Flags.Credential)
34
35         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
36         
37         home = Attribute("home",
38                 "Experiment home directory to store all experiment related files",
39                 flags = Flags.ExecReadOnly)
40         
41         identity = Attribute("identity", "SSH identity file",
42                 flags = Flags.Credential)
43         
44         server_key = Attribute("serverKey", "Server public key", 
45                 flags = Flags.ExecReadOnly)
46         
47         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
48                 " from home folder before starting experiment", 
49                 flags = Flags.ExecReadOnly)
50         
51         clean_processes = Attribute("cleanProcesses", 
52                 "Kill all running processes before starting experiment",
53                 flags = Flags.ExecReadOnly)
54         
55         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
56                 "releasing the resource",
57                 flags = Flags.ExecReadOnly)
58
59         cls._register_attribute(hostname)
60         cls._register_attribute(username)
61         cls._register_attribute(port)
62         cls._register_attribute(home)
63         cls._register_attribute(identity)
64         cls._register_attribute(server_key)
65         cls._register_attribute(clean_home)
66         cls._register_attribute(clean_processes)
67         cls._register_attribute(tear_down)
68
69     def __init__(self, ec, guid):
70         super(LinuxNode, self).__init__(ec, guid)
71         self._os = None
72         
73         # lock to avoid concurrency issues on methods used by applications 
74         self._lock = threading.Lock()
75
76         self._logger = logging.getLogger("LinuxNode")
77     
78     def log_message(self, msg):
79         return " guid %d - host %s - %s " % (self.guid, 
80                 self.get("hostname"), msg)
81
82     @property
83     def home(self):
84         return self.get("home") or ""
85
86     @property
87     def exp_home(self):
88         return os.path.join(self.home, self.ec.exp_id)
89
90     @property
91     def node_home(self):
92         node_home = "node-%d" % self.guid
93         return os.path.join(self.exp_home, node_home)
94
95     @property
96     def os(self):
97         if self._os:
98             return self._os
99
100         if (not self.get("hostname") or not self.get("username")):
101             msg = "Can't resolve OS, insufficient data "
102             self.error(msg)
103             raise RuntimeError, msg
104
105         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
106
107         if err and proc.poll():
108             msg = "Error detecting OS "
109             self.error(msg, out, err)
110             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
111
112         if out.find("Fedora release 12") == 0:
113             self._os = "f12"
114         elif out.find("Fedora release 14") == 0:
115             self._os = "f14"
116         elif out.find("Debian") == 0: 
117             self._os = "debian"
118         elif out.find("Ubuntu") ==0:
119             self._os = "ubuntu"
120         else:
121             msg = "Unsupported OS"
122             self.error(msg, out)
123             raise RuntimeError, "%s - %s " %( msg, out )
124
125         return self._os
126
127     @property
128     def localhost(self):
129         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
130
131     def provision(self, filters = None):
132         if not self.is_alive():
133             self._state = ResourceState.FAILED
134             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
135             self.error(msg)
136             raise RuntimeError, msg
137
138         if self.get("cleanProcesses"):
139             self.clean_processes()
140
141         if self.get("cleanHome"):
142             self.clean_home()
143        
144         self.mkdir(self.node_home)
145
146         super(LinuxNode, self).provision()
147
148     def deploy(self):
149         if self.state == ResourceState.NEW:
150             try:
151                self.discover()
152                self.provision()
153             except:
154                 self._state = ResourceState.FAILED
155                 raise
156
157         # Node needs to wait until all associated interfaces are 
158         # ready before it can finalize deployment
159         from nepi.resources.linux.interface import LinuxInterface
160         ifaces = self.get_connected(LinuxInterface.rtype())
161         for iface in ifaces:
162             if iface.state < ResourceState.READY:
163                 self.ec.schedule(reschedule_delay, self.deploy)
164                 return 
165
166         super(LinuxNode, self).deploy()
167
168     def release(self):
169         tear_down = self.get("tearDown")
170         if tear_down:
171             self.execute(tear_down)
172
173         super(LinuxNode, self).release()
174
175     def valid_connection(self, guid):
176         # TODO: Validate!
177         return True
178
179     def clean_processes(self, killer = False):
180         self.info("Cleaning up processes")
181         
182         if killer:
183             # Hardcore kill
184             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
185                 "sudo -S killall python tcpdump || /bin/true ; " +
186                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
187                 "sudo -S killall -u root || /bin/true ; " +
188                 "sudo -S killall -u root || /bin/true ; ")
189         else:
190             # Be gentler...
191             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
192                 "sudo -S killall tcpdump || /bin/true ; " +
193                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
194                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
195
196         out = err = ""
197         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
198             
199     def clean_home(self):
200         self.info("Cleaning up home")
201         
202         cmd = (
203             # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
204             "find . -maxdepth 1 -name 'nepi-*' " +
205             " -execdir rm -rf {} + "
206             )
207             
208         if self.home:
209             cmd = "cd %s ; " % self.home + cmd
210
211         out = err = ""
212         (out, err), proc = self.execute(cmd, with_lock = True)
213
214     def upload(self, src, dst, text = False):
215         """ Copy content to destination
216
217            src  content to copy. Can be a local file, directory or a list of files
218
219            dst  destination path on the remote host (remote is always self.host)
220
221            text src is text input, it must be stored into a temp file before uploading
222         """
223         # If source is a string input 
224         f = None
225         if text and not os.path.isfile(src):
226             # src is text input that should be uploaded as file
227             # create a temporal file with the content to upload
228             f = tempfile.NamedTemporaryFile(delete=False)
229             f.write(src)
230             f.close()
231             src = f.name
232
233         if not self.localhost:
234             # Build destination as <user>@<server>:<path>
235             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
236
237         result = self.copy(src, dst)
238
239         # clean up temp file
240         if f:
241             os.remove(f.name)
242
243         return result
244
245     def download(self, src, dst):
246         if not self.localhost:
247             # Build destination as <user>@<server>:<path>
248             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
249         return self.copy(src, dst)
250
251     def install_packages(self, packages, home = None):
252         home = home or self.node_home
253
254         cmd = ""
255         if self.os in ["f12", "f14"]:
256             cmd = rpmfuncs.install_packages_command(self.os, packages)
257         elif self.os in ["debian", "ubuntu"]:
258             cmd = debfuncs.install_packages_command(self.os, packages)
259         else:
260             msg = "Error installing packages ( OS not known ) "
261             self.error(msg, self.os)
262             raise RuntimeError, msg
263
264         out = err = ""
265         (out, err), proc = self.run_and_wait(cmd, home, 
266             pidfile = "instpkg_pid",
267             stdout = "instpkg_out", 
268             stderr = "instpkg_err",
269             raise_on_error = True)
270
271         return (out, err), proc 
272
273     def remove_packages(self, packages, home = None):
274         home = home or self.node_home
275
276         cmd = ""
277         if self.os in ["f12", "f14"]:
278             cmd = rpmfuncs.remove_packages_command(self.os, packages)
279         elif self.os in ["debian", "ubuntu"]:
280             cmd = debfuncs.remove_packages_command(self.os, packages)
281         else:
282             msg = "Error removing packages ( OS not known ) "
283             self.error(msg)
284             raise RuntimeError, msg
285
286         out = err = ""
287         (out, err), proc = self.run_and_wait(cmd, home, 
288             pidfile = "rmpkg_pid",
289             stdout = "rmpkg_out", 
290             stderr = "rmpkg_err",
291             raise_on_error = True)
292          
293         return (out, err), proc 
294
295     def mkdir(self, path, clean = False):
296         if clean:
297             self.rmdir(path)
298
299         return self.execute("mkdir -p %s" % path, with_lock = True)
300
301     def rmdir(self, path):
302         return self.execute("rm -rf %s" % path, with_lock = True)
303
304     def run_and_wait(self, command, 
305             home = ".", 
306             pidfile = "pid", 
307             stdin = None, 
308             stdout = 'stdout', 
309             stderr = 'stderr', 
310             sudo = False,
311             tty = False,
312             raise_on_error = False):
313         """ runs a command in background on the remote host, but waits
314             until the command finishes execution.
315             This is more robust than doing a simple synchronized 'execute',
316             since in the remote host the command can continue to run detached
317             even if network disconnections occur
318         """
319         # run command in background in remote host
320         (out, err), proc = self.run(command, home, 
321                 pidfile = pidfile,
322                 stdin = stdin, 
323                 stdout = stdout, 
324                 stderr = stderr, 
325                 sudo = sudo,
326                 tty = tty)
327
328         # check no errors occurred
329         if proc.poll() and err:
330             msg = " Failed to run command '%s' " % command
331             self.error(msg, out, err)
332             if raise_on_error:
333                 raise RuntimeError, msg
334
335         # Wait for pid file to be generated
336         pid, ppid = self.wait_pid(
337                 home = home, 
338                 pidfile = pidfile, 
339                 raise_on_error = raise_on_error)
340
341         # wait until command finishes to execute
342         self.wait_run(pid, ppid)
343        
344         # check if execution errors occurred
345         (out, err), proc = self.check_output(home, stderr)
346
347         if err or out:
348             msg = " Failed to run command '%s' " % command
349             self.error(msg, out, err)
350
351             if raise_on_error:
352                 raise RuntimeError, msg
353         
354         return (out, err), proc
355  
356     def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
357         """ Waits until the pid file for the command is generated, 
358             and returns the pid and ppid of the process """
359         pid = ppid = None
360         delay = 1.0
361         for i in xrange(5):
362             pidtuple = self.checkpid(home = home, pidfile = pidfile)
363             
364             if pidtuple:
365                 pid, ppid = pidtuple
366                 break
367             else:
368                 time.sleep(delay)
369                 delay = min(30,delay*1.2)
370         else:
371             msg = " Failed to get pid for pidfile %s/%s " % (
372                     home, pidfile )
373             self.error(msg)
374             
375             if raise_on_error:
376                 raise RuntimeError, msg
377
378         return pid, ppid
379
380     def wait_run(self, pid, ppid, trial = 0):
381         """ wait for a remote process to finish execution """
382         delay = 1.0
383         first = True
384         bustspin = 0
385
386         while True:
387             status = self.status(pid, ppid)
388             
389             if status is sshfuncs.FINISHED:
390                 break
391             elif status is not sshfuncs.RUNNING:
392                 bustspin += 1
393                 time.sleep(delay*(5.5+random.random()))
394                 if bustspin > 12:
395                     break
396             else:
397                 if first:
398                     first = False
399
400                 time.sleep(delay*(0.5+random.random()))
401                 delay = min(30,delay*1.2)
402                 bustspin = 0
403
404     def check_output(self, home, filename):
405         """ checks file content """
406         (out, err), proc = self.execute("cat %s" % 
407             os.path.join(home, filename), retry = 1, with_lock = True)
408         return (out, err), proc
409
410     def is_alive(self):
411         if self.localhost:
412             return True
413
414         out = err = ""
415         try:
416             # TODO: FIX NOT ALIVE!!!!
417             (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
418                     with_lock = True)
419         except:
420             import traceback
421             trace = traceback.format_exc()
422             msg = "Unresponsive host  %s " % err
423             self.error(msg, out, trace)
424             return False
425
426         if out.strip().startswith('ALIVE'):
427             return True
428         else:
429             msg = "Unresponsive host "
430             self.error(msg, out, err)
431             return False
432
433     def copy(self, src, dst):
434         if self.localhost:
435             (out, err), proc =  execfuncs.lcopy(source, dest, 
436                     recursive = True,
437                     strict_host_checking = False)
438         else:
439             with self._lock:
440                 (out, err), proc = sshfuncs.rcopy(
441                     src, dst, 
442                     port = self.get("port"),
443                     identity = self.get("identity"),
444                     server_key = self.get("serverKey"),
445                     recursive = True,
446                     strict_host_checking = False)
447
448         return (out, err), proc
449
450     def execute(self, command,
451             sudo = False,
452             stdin = None, 
453             env = None,
454             tty = False,
455             forward_x11 = False,
456             timeout = None,
457             retry = 3,
458             err_on_timeout = True,
459             connect_timeout = 30,
460             strict_host_checking = False,
461             persistent = True,
462             with_lock = False
463             ):
464         """ Notice that this invocation will block until the
465         execution finishes. If this is not the desired behavior,
466         use 'run' instead."""
467
468         if self.localhost:
469             (out, err), proc = execfuncs.lexec(command, 
470                     user = user,
471                     sudo = sudo,
472                     stdin = stdin,
473                     env = env)
474         else:
475             if with_lock:
476                 with self._lock:
477                     (out, err), proc = sshfuncs.rexec(
478                         command, 
479                         host = self.get("hostname"),
480                         user = self.get("username"),
481                         port = self.get("port"),
482                         agent = True,
483                         sudo = sudo,
484                         stdin = stdin,
485                         identity = self.get("identity"),
486                         server_key = self.get("serverKey"),
487                         env = env,
488                         tty = tty,
489                         forward_x11 = forward_x11,
490                         timeout = timeout,
491                         retry = retry,
492                         err_on_timeout = err_on_timeout,
493                         connect_timeout = connect_timeout,
494                         persistent = persistent,
495                         strict_host_checking = strict_host_checking
496                         )
497             else:
498                 (out, err), proc = sshfuncs.rexec(
499                     command, 
500                     host = self.get("hostname"),
501                     user = self.get("username"),
502                     port = self.get("port"),
503                     agent = True,
504                     sudo = sudo,
505                     stdin = stdin,
506                     identity = self.get("identity"),
507                     server_key = self.get("serverKey"),
508                     env = env,
509                     tty = tty,
510                     forward_x11 = forward_x11,
511                     timeout = timeout,
512                     retry = retry,
513                     err_on_timeout = err_on_timeout,
514                     connect_timeout = connect_timeout,
515                     persistent = persistent
516                     )
517
518         return (out, err), proc
519
520     def run(self, command, 
521             home = None,
522             create_home = False,
523             pidfile = "pid",
524             stdin = None, 
525             stdout = 'stdout', 
526             stderr = 'stderr', 
527             sudo = False,
528             tty = False):
529
530         self.debug("Running command '%s'" % command)
531         
532         if self.localhost:
533             (out, err), proc = execfuncs.lspawn(command, pidfile, 
534                     stdout = stdout, 
535                     stderr = stderr, 
536                     stdin = stdin, 
537                     home = home, 
538                     create_home = create_home, 
539                     sudo = sudo,
540                     user = user) 
541         else:
542             # Start process in a "daemonized" way, using nohup and heavy
543             # stdin/out redirection to avoid connection issues
544             with self._lock:
545                 (out,err), proc = sshfuncs.rspawn(
546                     command,
547                     pidfile = pidfile,
548                     home = home,
549                     create_home = create_home,
550                     stdin = stdin if stdin is not None else '/dev/null',
551                     stdout = stdout if stdout else '/dev/null',
552                     stderr = stderr if stderr else '/dev/null',
553                     sudo = sudo,
554                     host = self.get("hostname"),
555                     user = self.get("username"),
556                     port = self.get("port"),
557                     agent = True,
558                     identity = self.get("identity"),
559                     server_key = self.get("serverKey"),
560                     tty = tty
561                     )
562
563         return (out, err), proc
564
565     def checkpid(self, home = ".", pidfile = "pid"):
566         if self.localhost:
567             pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
568         else:
569             with self._lock:
570                 pidtuple = sshfuncs.rcheckpid(
571                     os.path.join(home, pidfile),
572                     host = self.get("hostname"),
573                     user = self.get("username"),
574                     port = self.get("port"),
575                     agent = True,
576                     identity = self.get("identity"),
577                     server_key = self.get("serverKey")
578                     )
579         
580         return pidtuple
581     
582     def status(self, pid, ppid):
583         if self.localhost:
584             status = execfuncs.lstatus(pid, ppid)
585         else:
586             with self._lock:
587                 status = sshfuncs.rstatus(
588                         pid, ppid,
589                         host = self.get("hostname"),
590                         user = self.get("username"),
591                         port = self.get("port"),
592                         agent = True,
593                         identity = self.get("identity"),
594                         server_key = self.get("serverKey")
595                         )
596            
597         return status
598     
599     def kill(self, pid, ppid, sudo = False):
600         out = err = ""
601         proc = None
602         status = self.status(pid, ppid)
603
604         if status == sshfuncs.RUNNING:
605             if self.localhost:
606                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
607             else:
608                 with self._lock:
609                     (out, err), proc = sshfuncs.rkill(
610                         pid, ppid,
611                         host = self.get("hostname"),
612                         user = self.get("username"),
613                         port = self.get("port"),
614                         agent = True,
615                         sudo = sudo,
616                         identity = self.get("identity"),
617                         server_key = self.get("serverKey")
618                         )
619         return (out, err), proc
620
621     def check_bad_host(self, out, err):
622         badre = re.compile(r'(?:'
623                            r'|Error: disk I/O error'
624                            r')', 
625                            re.I)
626         return badre.search(out) or badre.search(err)
627
628     def blacklist(self):
629         # TODO!!!!
630         self.warn(" Blacklisting malfunctioning node ")
631         #import util
632         #util.appendBlacklist(self.hostname)
633