Fixed relative paths in Linux Application
[nepi.git] / src / neco / resources / linux / node.py
1 from neco.execution.attribute import Attribute, Flags
2 from neco.execution.resource import ResourceManager, clsinit, ResourceState
3 from neco.resources.linux import rpmfuncs, debfuncs 
4 from neco.util import sshfuncs, execfuncs 
5
6 import collections
7 import logging
8 import os
9 import random
10 import re
11 import tempfile
12 import time
13 import threading
14
15 # TODO: Verify files and dirs exists already
16 # TODO: Blacklist nodes!
17 # TODO: Unify delays!!
18
19 reschedule_delay = "0.5s"
20
21
22 @clsinit
23 class LinuxNode(ResourceManager):
24     _rtype = "LinuxNode"
25
26     @classmethod
27     def _register_attributes(cls):
28         hostname = Attribute("hostname", "Hostname of the machine",
29                 flags = Flags.ExecReadOnly)
30
31         username = Attribute("username", "Local account username", 
32                 flags = Flags.Credential)
33
34         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
35         
36         home = Attribute("home",
37                 "Experiment home directory to store all experiment related files",
38                 flags = Flags.ExecReadOnly)
39         
40         identity = Attribute("identity", "SSH identity file",
41                 flags = Flags.Credential)
42         
43         server_key = Attribute("serverKey", "Server public key", 
44                 flags = Flags.ExecReadOnly)
45         
46         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
47                 " from home folder before starting experiment", 
48                 flags = Flags.ExecReadOnly)
49         
50         clean_processes = Attribute("cleanProcesses", 
51                 "Kill all running processes before starting experiment",
52                 flags = Flags.ExecReadOnly)
53         
54         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
55                 "releasing the resource",
56                 flags = Flags.ExecReadOnly)
57
58         cls._register_attribute(hostname)
59         cls._register_attribute(username)
60         cls._register_attribute(port)
61         cls._register_attribute(home)
62         cls._register_attribute(identity)
63         cls._register_attribute(server_key)
64         cls._register_attribute(clean_home)
65         cls._register_attribute(clean_processes)
66         cls._register_attribute(tear_down)
67
68     def __init__(self, ec, guid):
69         super(LinuxNode, self).__init__(ec, guid)
70         self._os = None
71         
72         # lock to avoid concurrency issues on methods used by applications 
73         self._lock = threading.Lock()
74
75         self._logger = logging.getLogger("LinuxNode")
76     
77     def log_message(self, msg):
78         return " guid %d - host %s - %s " % (self.guid, 
79                 self.get("hostname"), msg)
80
81     @property
82     def home(self):
83         return self.get("home") or ""
84
85     @property
86     def exp_home(self):
87         return os.path.join(self.home, self.ec.exp_id)
88
89     @property
90     def node_home(self):
91         node_home = "node-%d" % self.guid
92         return os.path.join(self.exp_home, node_home)
93
94     @property
95     def os(self):
96         if self._os:
97             return self._os
98
99         if (not self.get("hostname") or not self.get("username")):
100             msg = "Can't resolve OS, insufficient data "
101             self.error(msg)
102             raise RuntimeError, msg
103
104         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
105
106         if err and proc.poll():
107             msg = "Error detecting OS "
108             self.error(msg, out, err)
109             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
110
111         if out.find("Fedora release 12") == 0:
112             self._os = "f12"
113         elif out.find("Fedora release 14") == 0:
114             self._os = "f14"
115         elif out.find("Debian") == 0: 
116             self._os = "debian"
117         elif out.find("Ubuntu") ==0:
118             self._os = "ubuntu"
119         else:
120             msg = "Unsupported OS"
121             self.error(msg, out)
122             raise RuntimeError, "%s - %s " %( msg, out )
123
124         return self._os
125
126     @property
127     def localhost(self):
128         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
129
130     def provision(self, filters = None):
131         if not self.is_alive():
132             self._state = ResourceState.FAILED
133             self.error("Deploy failed. Unresponsive node")
134             return
135
136         if self.get("cleanProcesses"):
137             self.clean_processes()
138
139         if self.get("cleanHome"):
140             self.clean_home()
141        
142         self.mkdir(self.node_home)
143
144         super(LinuxNode, self).provision()
145
146     def deploy(self):
147         if self.state == ResourceState.NEW:
148             try:
149                self.discover()
150                self.provision()
151             except:
152                 self._state = ResourceState.FAILED
153                 raise
154
155         # Node needs to wait until all associated interfaces are 
156         # ready before it can finalize deployment
157         from neco.resources.linux.interface import LinuxInterface
158         ifaces = self.get_connected(LinuxInterface.rtype())
159         for iface in ifaces:
160             if iface.state < ResourceState.READY:
161                 self.ec.schedule(reschedule_delay, self.deploy)
162                 return 
163
164         super(LinuxNode, self).deploy()
165
166     def release(self):
167         tear_down = self.get("tearDown")
168         if tear_down:
169             self.execute(tear_down)
170
171         super(LinuxNode, self).release()
172
173     def valid_connection(self, guid):
174         # TODO: Validate!
175         return True
176
177     def clean_processes(self, killer = False):
178         self.info("Cleaning up processes")
179         
180         if killer:
181             # Hardcore kill
182             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
183                 "sudo -S killall python tcpdump || /bin/true ; " +
184                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
185                 "sudo -S killall -u root || /bin/true ; " +
186                 "sudo -S killall -u root || /bin/true ; ")
187         else:
188             # Be gentler...
189             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
190                 "sudo -S killall tcpdump || /bin/true ; " +
191                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
192                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
193
194         out = err = ""
195         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
196             
197     def clean_home(self):
198         self.info("Cleaning up home")
199         
200         cmd = (
201             # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
202             "find . -maxdepth 1 -name 'nepi-*' " +
203             " -execdir rm -rf {} + "
204             )
205             
206         if self.home:
207             cmd = "cd %s ; " % self.home + cmd
208
209         out = err = ""
210         (out, err), proc = self.execute(cmd, with_lock = True)
211
212     def upload(self, src, dst, text = False):
213         """ Copy content to destination
214
215            src  content to copy. Can be a local file, directory or a list of files
216
217            dst  destination path on the remote host (remote is always self.host)
218
219            text src is text input, it must be stored into a temp file before uploading
220         """
221         # If source is a string input 
222         f = None
223         if text and not os.path.isfile(src):
224             # src is text input that should be uploaded as file
225             # create a temporal file with the content to upload
226             f = tempfile.NamedTemporaryFile(delete=False)
227             f.write(src)
228             f.close()
229             src = f.name
230
231         if not self.localhost:
232             # Build destination as <user>@<server>:<path>
233             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
234
235         result = self.copy(src, dst)
236
237         # clean up temp file
238         if f:
239             os.remove(f.name)
240
241         return result
242
243     def download(self, src, dst):
244         if not self.localhost:
245             # Build destination as <user>@<server>:<path>
246             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
247         return self.copy(src, dst)
248
249     def install_packages(self, packages, home = None):
250         home = home or self.node_home
251
252         cmd = ""
253         if self.os in ["f12", "f14"]:
254             cmd = rpmfuncs.install_packages_command(self.os, packages)
255         elif self.os in ["debian", "ubuntu"]:
256             cmd = debfuncs.install_packages_command(self.os, packages)
257         else:
258             msg = "Error installing packages ( OS not known ) "
259             self.error(msg, self.os)
260             raise RuntimeError, msg
261
262         out = err = ""
263         (out, err), proc = self.run_and_wait(cmd, home, 
264             pidfile = "instpkg_pid",
265             stdout = "instpkg_out", 
266             stderr = "instpkg_err",
267             raise_on_error = True)
268
269         return (out, err), proc 
270
271     def remove_packages(self, packages, home = None):
272         home = home or self.node_home
273
274         cmd = ""
275         if self.os in ["f12", "f14"]:
276             cmd = rpmfuncs.remove_packages_command(self.os, packages)
277         elif self.os in ["debian", "ubuntu"]:
278             cmd = debfuncs.remove_packages_command(self.os, packages)
279         else:
280             msg = "Error removing packages ( OS not known ) "
281             self.error(msg)
282             raise RuntimeError, msg
283
284         out = err = ""
285         (out, err), proc = self.run_and_wait(cmd, home, 
286             pidfile = "rmpkg_pid",
287             stdout = "rmpkg_out", 
288             stderr = "rmpkg_err",
289             raise_on_error = True)
290          
291         return (out, err), proc 
292
293     def mkdir(self, path, clean = False):
294         if clean:
295             self.rmdir(path)
296
297         return self.execute("mkdir -p %s" % path, with_lock = True)
298
299     def rmdir(self, path):
300         return self.execute("rm -rf %s" % path, with_lock = True)
301
302     def run_and_wait(self, command, 
303             home = ".", 
304             pidfile = "pid", 
305             stdin = None, 
306             stdout = 'stdout', 
307             stderr = 'stderr', 
308             sudo = False,
309             tty = False,
310             raise_on_error = False):
311         """ runs a command in background on the remote host, but waits
312             until the command finishes execution.
313             This is more robust than doing a simple synchronized 'execute',
314             since in the remote host the command can continue to run detached
315             even if network disconnections occur
316         """
317         # run command in background in remote host
318         (out, err), proc = self.run(command, home, 
319                 pidfile = pidfile,
320                 stdin = stdin, 
321                 stdout = stdout, 
322                 stderr = stderr, 
323                 sudo = sudo,
324                 tty = tty)
325
326         # check no errors occurred
327         if proc.poll() and err:
328             msg = " Failed to run command '%s' " % command
329             self.error(msg, out, err)
330             if raise_on_error:
331                 raise RuntimeError, msg
332
333         # Wait for pid file to be generated
334         pid, ppid = self.wait_pid(
335                 home = home, 
336                 pidfile = pidfile, 
337                 raise_on_error = raise_on_error)
338
339         # wait until command finishes to execute
340         self.wait_run(pid, ppid)
341        
342         # check if execution errors occurred
343         (out, err), proc = self.check_output(home, stderr)
344
345         if err or out:
346             msg = " Failed to run command '%s' " % command
347             self.error(msg, out, err)
348
349             if raise_on_error:
350                 raise RuntimeError, msg
351         
352         return (out, err), proc
353  
354     def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
355         """ Waits until the pid file for the command is generated, 
356             and returns the pid and ppid of the process """
357         pid = ppid = None
358         delay = 1.0
359         for i in xrange(5):
360             pidtuple = self.checkpid(home = home, pidfile = pidfile)
361             
362             if pidtuple:
363                 pid, ppid = pidtuple
364                 break
365             else:
366                 time.sleep(delay)
367                 delay = min(30,delay*1.2)
368         else:
369             msg = " Failed to get pid for pidfile %s/%s " % (
370                     home, pidfile )
371             self.error(msg)
372             
373             if raise_on_error:
374                 raise RuntimeError, msg
375
376         return pid, ppid
377
378     def wait_run(self, pid, ppid, trial = 0):
379         """ wait for a remote process to finish execution """
380         delay = 1.0
381         first = True
382         bustspin = 0
383
384         while True:
385             status = self.status(pid, ppid)
386             
387             if status is sshfuncs.FINISHED:
388                 break
389             elif status is not sshfuncs.RUNNING:
390                 bustspin += 1
391                 time.sleep(delay*(5.5+random.random()))
392                 if bustspin > 12:
393                     break
394             else:
395                 if first:
396                     first = False
397
398                 time.sleep(delay*(0.5+random.random()))
399                 delay = min(30,delay*1.2)
400                 bustspin = 0
401
402     def check_output(self, home, filename):
403         """ checks file content """
404         (out, err), proc = self.execute("cat %s" % 
405             os.path.join(home, filename), retry = 1, with_lock = True)
406         return (out, err), proc
407
408     def is_alive(self):
409         if self.localhost:
410             return True
411
412         out = err = ""
413         try:
414             (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True)
415         except:
416             import traceback
417             trace = traceback.format_exc()
418             msg = "Unresponsive host "
419             self.warn(msg, out, trace)
420             return False
421
422         if out.strip().startswith('ALIVE'):
423             return True
424         else:
425             msg = "Unresponsive host "
426             self.warn(msg, out, err)
427             return False
428
429             # TODO!
430             #if self.check_bad_host(out,err):
431             #    self.blacklist()
432
433     def copy(self, src, dst):
434         if self.localhost:
435             (out, err), proc =  execfuncs.lcopy(source, dest, 
436                     recursive = True)
437         else:
438             with self._lock:
439                 (out, err), proc = sshfuncs.rcopy(
440                     src, dst, 
441                     port = self.get("port"),
442                     identity = self.get("identity"),
443                     server_key = self.get("serverKey"),
444                     recursive = True)
445
446         return (out, err), proc
447
448     def execute(self, command,
449             sudo = False,
450             stdin = None, 
451             env = None,
452             tty = False,
453             forward_x11 = False,
454             timeout = None,
455             retry = 3,
456             err_on_timeout = True,
457             connect_timeout = 30,
458             persistent = True,
459             with_lock = False
460             ):
461         """ Notice that this invocation will block until the
462         execution finishes. If this is not the desired behavior,
463         use 'run' instead."""
464
465         if self.localhost:
466             (out, err), proc = execfuncs.lexec(command, 
467                     user = user,
468                     sudo = sudo,
469                     stdin = stdin,
470                     env = env)
471         else:
472             if with_lock:
473                 with self._lock:
474                     (out, err), proc = sshfuncs.rexec(
475                         command, 
476                         host = self.get("hostname"),
477                         user = self.get("username"),
478                         port = self.get("port"),
479                         agent = True,
480                         sudo = sudo,
481                         stdin = stdin,
482                         identity = self.get("identity"),
483                         server_key = self.get("serverKey"),
484                         env = env,
485                         tty = tty,
486                         forward_x11 = forward_x11,
487                         timeout = timeout,
488                         retry = retry,
489                         err_on_timeout = err_on_timeout,
490                         connect_timeout = connect_timeout,
491                         persistent = persistent
492                         )
493             else:
494                 (out, err), proc = sshfuncs.rexec(
495                     command, 
496                     host = self.get("hostname"),
497                     user = self.get("username"),
498                     port = self.get("port"),
499                     agent = True,
500                     sudo = sudo,
501                     stdin = stdin,
502                     identity = self.get("identity"),
503                     server_key = self.get("serverKey"),
504                     env = env,
505                     tty = tty,
506                     forward_x11 = forward_x11,
507                     timeout = timeout,
508                     retry = retry,
509                     err_on_timeout = err_on_timeout,
510                     connect_timeout = connect_timeout,
511                     persistent = persistent
512                     )
513
514         return (out, err), proc
515
516     def run(self, command, 
517             home = None,
518             create_home = False,
519             pidfile = "pid",
520             stdin = None, 
521             stdout = 'stdout', 
522             stderr = 'stderr', 
523             sudo = False,
524             tty = False):
525
526         self.debug("Running command '%s'" % command)
527         
528         if self.localhost:
529             (out, err), proc = execfuncs.lspawn(command, pidfile, 
530                     stdout = stdout, 
531                     stderr = stderr, 
532                     stdin = stdin, 
533                     home = home, 
534                     create_home = create_home, 
535                     sudo = sudo,
536                     user = user) 
537         else:
538             # Start process in a "daemonized" way, using nohup and heavy
539             # stdin/out redirection to avoid connection issues
540             with self._lock:
541                 (out,err), proc = sshfuncs.rspawn(
542                     command,
543                     pidfile = pidfile,
544                     home = home,
545                     create_home = create_home,
546                     stdin = stdin if stdin is not None else '/dev/null',
547                     stdout = stdout if stdout else '/dev/null',
548                     stderr = stderr if stderr else '/dev/null',
549                     sudo = sudo,
550                     host = self.get("hostname"),
551                     user = self.get("username"),
552                     port = self.get("port"),
553                     agent = True,
554                     identity = self.get("identity"),
555                     server_key = self.get("serverKey"),
556                     tty = tty
557                     )
558
559         return (out, err), proc
560
561     def checkpid(self, home = ".", pidfile = "pid"):
562         if self.localhost:
563             pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
564         else:
565             with self._lock:
566                 pidtuple = sshfuncs.rcheckpid(
567                     os.path.join(home, pidfile),
568                     host = self.get("hostname"),
569                     user = self.get("username"),
570                     port = self.get("port"),
571                     agent = True,
572                     identity = self.get("identity"),
573                     server_key = self.get("serverKey")
574                     )
575         
576         return pidtuple
577     
578     def status(self, pid, ppid):
579         if self.localhost:
580             status = execfuncs.lstatus(pid, ppid)
581         else:
582             with self._lock:
583                 status = sshfuncs.rstatus(
584                         pid, ppid,
585                         host = self.get("hostname"),
586                         user = self.get("username"),
587                         port = self.get("port"),
588                         agent = True,
589                         identity = self.get("identity"),
590                         server_key = self.get("serverKey")
591                         )
592            
593         return status
594     
595     def kill(self, pid, ppid, sudo = False):
596         out = err = ""
597         proc = None
598         status = self.status(pid, ppid)
599
600         if status == sshfuncs.RUNNING:
601             if self.localhost:
602                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
603             else:
604                 with self._lock:
605                     (out, err), proc = sshfuncs.rkill(
606                         pid, ppid,
607                         host = self.get("hostname"),
608                         user = self.get("username"),
609                         port = self.get("port"),
610                         agent = True,
611                         sudo = sudo,
612                         identity = self.get("identity"),
613                         server_key = self.get("serverKey")
614                         )
615         return (out, err), proc
616
617     def check_bad_host(self, out, err):
618         badre = re.compile(r'(?:'
619                            r'|Error: disk I/O error'
620                            r')', 
621                            re.I)
622         return badre.search(out) or badre.search(err)
623
624     def blacklist(self):
625         # TODO!!!!
626         self.warn(" Blacklisting malfunctioning node ")
627         #import util
628         #util.appendBlacklist(self.hostname)
629