Adding help and background class atrributes to ResourceManager
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
22         reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.ExecReadOnly)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.ExecReadOnly)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.ExecReadOnly)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 flags = Flags.ExecReadOnly)
172
173         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
174                 " from a previous same experiment, before the new experiment starts", 
175                 flags = Flags.ExecReadOnly)
176         
177         clean_processes = Attribute("cleanProcesses", 
178                 "Kill all running processes before starting experiment",
179                 flags = Flags.ExecReadOnly)
180         
181         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
182                 "releasing the resource",
183                 flags = Flags.ExecReadOnly)
184
185         cls._register_attribute(hostname)
186         cls._register_attribute(username)
187         cls._register_attribute(port)
188         cls._register_attribute(home)
189         cls._register_attribute(identity)
190         cls._register_attribute(server_key)
191         cls._register_attribute(clean_home)
192         cls._register_attribute(clean_experiment)
193         cls._register_attribute(clean_processes)
194         cls._register_attribute(tear_down)
195
196     def __init__(self, ec, guid):
197         super(LinuxNode, self).__init__(ec, guid)
198         self._os = None
199         # home directory at Linux host
200         self._home_dir = ""
201         
202         # lock to prevent concurrent applications on the same node,
203         # to execute commands at the same time. There are potential
204         # concurrency issues when using SSH to a same host from 
205         # multiple threads. There are also possible operational 
206         # issues, e.g. an application querying the existence 
207         # of a file or folder prior to its creation, and another 
208         # application creating the same file or folder in between.
209         self._node_lock = threading.Lock()
210     
211     def log_message(self, msg):
212         return " guid %d - host %s - %s " % (self.guid, 
213                 self.get("hostname"), msg)
214
215     @property
216     def home_dir(self):
217         home = self.get("home") or ""
218         if not home.startswith("/"):
219            home = os.path.join(self._home_dir, home) 
220         return home
221
222     @property
223     def usr_dir(self):
224         return os.path.join(self.home_dir, "nepi-usr")
225
226     @property
227     def lib_dir(self):
228         return os.path.join(self.usr_dir, "lib")
229
230     @property
231     def bin_dir(self):
232         return os.path.join(self.usr_dir, "bin")
233
234     @property
235     def src_dir(self):
236         return os.path.join(self.usr_dir, "src")
237
238     @property
239     def share_dir(self):
240         return os.path.join(self.usr_dir, "share")
241
242     @property
243     def exp_dir(self):
244         return os.path.join(self.home_dir, "nepi-exp")
245
246     @property
247     def exp_home(self):
248         return os.path.join(self.exp_dir, self.ec.exp_id)
249
250     @property
251     def node_home(self):
252         return os.path.join(self.exp_home, "node-%d" % self.guid)
253
254     @property
255     def run_home(self):
256         return os.path.join(self.node_home, self.ec.run_id)
257
258     @property
259     def os(self):
260         if self._os:
261             return self._os
262
263         if (not self.get("hostname") or not self.get("username")):
264             msg = "Can't resolve OS, insufficient data "
265             self.error(msg)
266             raise RuntimeError, msg
267
268         out = self.get_os()
269
270         if out.find("Fedora release 8") == 0:
271             self._os = OSType.FEDORA_8
272         elif out.find("Fedora release 12") == 0:
273             self._os = OSType.FEDORA_12
274         elif out.find("Fedora release 14") == 0:
275             self._os = OSType.FEDORA_14
276         elif out.find("Debian") == 0: 
277             self._os = OSType.DEBIAN
278         elif out.find("Ubuntu") ==0:
279             self._os = OSType.UBUNTU
280         else:
281             msg = "Unsupported OS"
282             self.error(msg, out)
283             raise RuntimeError, "%s - %s " %( msg, out )
284
285         return self._os
286
287     def get_os(self):
288         # The underlying SSH layer will sometimes return an empty
289         # output (even if the command was executed without errors).
290         # To work arround this, repeat the operation N times or
291         # until the result is not empty string
292         out = ""
293         retrydelay = 1.0
294         for i in xrange(10):
295             try:
296                 (out, err), proc = self.execute("cat /etc/issue", 
297                         retry = 5,
298                         with_lock = True,
299                         blocking = True)
300
301                 if out.strip() != "":
302                     return out
303             except:
304                 trace = traceback.format_exc()
305                 msg = "Error detecting OS: %s " % trace
306                 self.error(msg, out, err)
307                 return False
308
309             time.sleep(min(30.0, retrydelay))
310             retrydelay *= 1.5
311
312
313     @property
314     def use_deb(self):
315         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
316
317     @property
318     def use_rpm(self):
319         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
320                 OSType.FEDORA]
321
322     @property
323     def localhost(self):
324         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
325
326     def provision(self):
327         # check if host is alive
328         if not self.is_alive():
329             self.fail()
330             
331             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
332             self.error(msg)
333             raise RuntimeError, msg
334
335         self.find_home()
336
337         if self.get("cleanProcesses"):
338             self.clean_processes()
339
340         if self.get("cleanHome"):
341             self.clean_home()
342  
343         if self.get("cleanExperiment"):
344             self.clean_experiment()
345     
346         # Create shared directory structure
347         self.mkdir(self.lib_dir)
348         self.mkdir(self.bin_dir)
349         self.mkdir(self.src_dir)
350         self.mkdir(self.share_dir)
351
352         # Create experiment node home directory
353         self.mkdir(self.node_home)
354
355         super(LinuxNode, self).provision()
356
357     def deploy(self):
358         if self.state == ResourceState.NEW:
359             try:
360                 self.discover()
361                 self.provision()
362             except:
363                 self.fail()
364                 raise
365
366         # Node needs to wait until all associated interfaces are 
367         # ready before it can finalize deployment
368         from nepi.resources.linux.interface import LinuxInterface
369         ifaces = self.get_connected(LinuxInterface.rtype())
370         for iface in ifaces:
371             if iface.state < ResourceState.READY:
372                 self.ec.schedule(reschedule_delay, self.deploy)
373                 return 
374
375         super(LinuxNode, self).deploy()
376
377     def release(self):
378         # Node needs to wait until all associated RMs are released
379         # to be released
380         rms = self.get_connected()
381         for rm in rms:
382             if rm.state < ResourceState.STOPPED:
383                 self.ec.schedule(reschedule_delay, self.release)
384                 return 
385
386         tear_down = self.get("tearDown")
387         if tear_down:
388             self.execute(tear_down)
389
390         self.clean_processes()
391
392         super(LinuxNode, self).release()
393
394     def valid_connection(self, guid):
395         # TODO: Validate!
396         return True
397
398     def clean_processes(self, killer = False):
399         self.info("Cleaning up processes")
400         
401         if killer:
402             # Hardcore kill
403             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
404                 "sudo -S killall python tcpdump || /bin/true ; " +
405                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
406                 "sudo -S killall -u root || /bin/true ; " +
407                 "sudo -S killall -u root || /bin/true ; ")
408         else:
409             # Be gentler...
410             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
411                 "sudo -S killall tcpdump || /bin/true ; " +
412                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
413                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
414
415         out = err = ""
416         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
417             
418     def clean_home(self):
419         """ Cleans all NEPI related folders in the Linux host
420         """
421         self.info("Cleaning up home")
422         
423         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
424                 self.home_dir )
425
426         return self.execute(cmd, with_lock = True)
427
428     def clean_experiment(self):
429         """ Cleans all experiment related files in the Linux host.
430         It preserves NEPI files and folders that have a multi experiment
431         scope.
432         """
433         self.info("Cleaning up experiment files")
434         
435         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
436                 self.exp_dir,
437                 self.ec.exp_id )
438             
439         return self.execute(cmd, with_lock = True)
440
441     def execute(self, command,
442             sudo = False,
443             stdin = None, 
444             env = None,
445             tty = False,
446             forward_x11 = False,
447             timeout = None,
448             retry = 3,
449             err_on_timeout = True,
450             connect_timeout = 30,
451             strict_host_checking = False,
452             persistent = True,
453             blocking = True,
454             with_lock = False
455             ):
456         """ Notice that this invocation will block until the
457         execution finishes. If this is not the desired behavior,
458         use 'run' instead."""
459
460         if self.localhost:
461             (out, err), proc = execfuncs.lexec(command, 
462                     user = user,
463                     sudo = sudo,
464                     stdin = stdin,
465                     env = env)
466         else:
467             if with_lock:
468                 with self._node_lock:
469                     (out, err), proc = sshfuncs.rexec(
470                         command, 
471                         host = self.get("hostname"),
472                         user = self.get("username"),
473                         port = self.get("port"),
474                         agent = True,
475                         sudo = sudo,
476                         stdin = stdin,
477                         identity = self.get("identity"),
478                         server_key = self.get("serverKey"),
479                         env = env,
480                         tty = tty,
481                         forward_x11 = forward_x11,
482                         timeout = timeout,
483                         retry = retry,
484                         err_on_timeout = err_on_timeout,
485                         connect_timeout = connect_timeout,
486                         persistent = persistent,
487                         blocking = blocking, 
488                         strict_host_checking = strict_host_checking
489                         )
490             else:
491                 (out, err), proc = sshfuncs.rexec(
492                     command, 
493                     host = self.get("hostname"),
494                     user = self.get("username"),
495                     port = self.get("port"),
496                     agent = True,
497                     sudo = sudo,
498                     stdin = stdin,
499                     identity = self.get("identity"),
500                     server_key = self.get("serverKey"),
501                     env = env,
502                     tty = tty,
503                     forward_x11 = forward_x11,
504                     timeout = timeout,
505                     retry = retry,
506                     err_on_timeout = err_on_timeout,
507                     connect_timeout = connect_timeout,
508                     persistent = persistent,
509                     blocking = blocking, 
510                     strict_host_checking = strict_host_checking
511                     )
512
513         return (out, err), proc
514
515     def run(self, command, home,
516             create_home = False,
517             pidfile = 'pidfile',
518             stdin = None, 
519             stdout = 'stdout', 
520             stderr = 'stderr', 
521             sudo = False,
522             tty = False):
523         
524         self.debug("Running command '%s'" % command)
525         
526         if self.localhost:
527             (out, err), proc = execfuncs.lspawn(command, pidfile, 
528                     stdout = stdout, 
529                     stderr = stderr, 
530                     stdin = stdin, 
531                     home = home, 
532                     create_home = create_home, 
533                     sudo = sudo,
534                     user = user) 
535         else:
536             with self._node_lock:
537                 (out, err), proc = sshfuncs.rspawn(
538                     command,
539                     pidfile = pidfile,
540                     home = home,
541                     create_home = create_home,
542                     stdin = stdin if stdin is not None else '/dev/null',
543                     stdout = stdout if stdout else '/dev/null',
544                     stderr = stderr if stderr else '/dev/null',
545                     sudo = sudo,
546                     host = self.get("hostname"),
547                     user = self.get("username"),
548                     port = self.get("port"),
549                     agent = True,
550                     identity = self.get("identity"),
551                     server_key = self.get("serverKey"),
552                     tty = tty
553                     )
554
555         return (out, err), proc
556
557     def getpid(self, home, pidfile = "pidfile"):
558         if self.localhost:
559             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
560         else:
561             with self._node_lock:
562                 pidtuple = sshfuncs.rgetpid(
563                     os.path.join(home, pidfile),
564                     host = self.get("hostname"),
565                     user = self.get("username"),
566                     port = self.get("port"),
567                     agent = True,
568                     identity = self.get("identity"),
569                     server_key = self.get("serverKey")
570                     )
571         
572         return pidtuple
573
574     def status(self, pid, ppid):
575         if self.localhost:
576             status = execfuncs.lstatus(pid, ppid)
577         else:
578             with self._node_lock:
579                 status = sshfuncs.rstatus(
580                         pid, ppid,
581                         host = self.get("hostname"),
582                         user = self.get("username"),
583                         port = self.get("port"),
584                         agent = True,
585                         identity = self.get("identity"),
586                         server_key = self.get("serverKey")
587                         )
588            
589         return status
590     
591     def kill(self, pid, ppid, sudo = False):
592         out = err = ""
593         proc = None
594         status = self.status(pid, ppid)
595
596         if status == sshfuncs.ProcStatus.RUNNING:
597             if self.localhost:
598                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
599             else:
600                 with self._node_lock:
601                     (out, err), proc = sshfuncs.rkill(
602                         pid, ppid,
603                         host = self.get("hostname"),
604                         user = self.get("username"),
605                         port = self.get("port"),
606                         agent = True,
607                         sudo = sudo,
608                         identity = self.get("identity"),
609                         server_key = self.get("serverKey")
610                         )
611
612         return (out, err), proc
613
614     def copy(self, src, dst):
615         if self.localhost:
616             (out, err), proc = execfuncs.lcopy(source, dest, 
617                     recursive = True,
618                     strict_host_checking = False)
619         else:
620             with self._node_lock:
621                 (out, err), proc = sshfuncs.rcopy(
622                     src, dst, 
623                     port = self.get("port"),
624                     identity = self.get("identity"),
625                     server_key = self.get("serverKey"),
626                     recursive = True,
627                     strict_host_checking = False)
628
629         return (out, err), proc
630
631
632     def upload(self, src, dst, text = False, overwrite = True):
633         """ Copy content to destination
634
635            src  content to copy. Can be a local file, directory or a list of files
636
637            dst  destination path on the remote host (remote is always self.host)
638
639            text src is text input, it must be stored into a temp file before uploading
640         """
641         # If source is a string input 
642         f = None
643         if text and not os.path.isfile(src):
644             # src is text input that should be uploaded as file
645             # create a temporal file with the content to upload
646             f = tempfile.NamedTemporaryFile(delete=False)
647             f.write(src)
648             f.close()
649             src = f.name
650
651         # If dst files should not be overwritten, check that the files do not
652         # exits already 
653         if overwrite == False:
654             src = self.filter_existing_files(src, dst)
655             if not src:
656                 return ("", ""), None 
657
658         if not self.localhost:
659             # Build destination as <user>@<server>:<path>
660             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
661
662         result = self.copy(src, dst)
663
664         # clean up temp file
665         if f:
666             os.remove(f.name)
667
668         return result
669
670     def download(self, src, dst):
671         if not self.localhost:
672             # Build destination as <user>@<server>:<path>
673             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
674         return self.copy(src, dst)
675
676     def install_packages_command(self, packages):
677         command = ""
678         if self.use_rpm:
679             command = rpmfuncs.install_packages_command(self.os, packages)
680         elif self.use_deb:
681             command = debfuncs.install_packages_command(self.os, packages)
682         else:
683             msg = "Error installing packages ( OS not known ) "
684             self.error(msg, self.os)
685             raise RuntimeError, msg
686
687         return command
688
689     def install_packages(self, packages, home, run_home = None):
690         """ Install packages in the Linux host.
691
692         'home' is the directory to upload the package installation script.
693         'run_home' is the directory from where to execute the script.
694         """
695         command = self.install_packages_command(packages)
696
697         run_home = run_home or home
698
699         (out, err), proc = self.run_and_wait(command, run_home, 
700             shfile = os.path.join(home, "instpkg.sh"),
701             pidfile = "instpkg_pidfile",
702             ecodefile = "instpkg_exitcode",
703             stdout = "instpkg_stdout", 
704             stderr = "instpkg_stderr",
705             overwrite = False,
706             raise_on_error = True)
707
708         return (out, err), proc 
709
710     def remove_packages(self, packages, home, run_home = None):
711         """ Uninstall packages from the Linux host.
712
713         'home' is the directory to upload the package un-installation script.
714         'run_home' is the directory from where to execute the script.
715         """
716         if self.use_rpm:
717             command = rpmfuncs.remove_packages_command(self.os, packages)
718         elif self.use_deb:
719             command = debfuncs.remove_packages_command(self.os, packages)
720         else:
721             msg = "Error removing packages ( OS not known ) "
722             self.error(msg)
723             raise RuntimeError, msg
724
725         run_home = run_home or home
726
727         (out, err), proc = self.run_and_wait(command, run_home, 
728             shfile = os.path.join(home, "rmpkg.sh"),
729             pidfile = "rmpkg_pidfile",
730             ecodefile = "rmpkg_exitcode",
731             stdout = "rmpkg_stdout", 
732             stderr = "rmpkg_stderr",
733             overwrite = False,
734             raise_on_error = True)
735          
736         return (out, err), proc 
737
738     def mkdir(self, path, clean = False):
739         if clean:
740             self.rmdir(path)
741
742         return self.execute("mkdir -p %s" % path, with_lock = True)
743
744     def rmdir(self, path):
745         return self.execute("rm -rf %s" % path, with_lock = True)
746         
747     def run_and_wait(self, command, home, 
748             shfile = "cmd.sh",
749             env = None,
750             overwrite = True,
751             pidfile = "pidfile", 
752             ecodefile = "exitcode", 
753             stdin = None, 
754             stdout = "stdout", 
755             stderr = "stderr", 
756             sudo = False,
757             tty = False,
758             raise_on_error = False):
759         """
760         Uploads the 'command' to a bash script in the host.
761         Then runs the script detached in background in the host, and
762         busy-waites until the script finishes executing.
763         """
764
765         if not shfile.startswith("/"):
766             shfile = os.path.join(home, shfile)
767
768         self.upload_command(command, 
769             shfile = shfile, 
770             ecodefile = ecodefile, 
771             env = env,
772             overwrite = overwrite)
773
774         command = "bash %s" % shfile
775         # run command in background in remote host
776         (out, err), proc = self.run(command, home, 
777                 pidfile = pidfile,
778                 stdin = stdin, 
779                 stdout = stdout, 
780                 stderr = stderr, 
781                 sudo = sudo,
782                 tty = tty)
783
784         # check no errors occurred
785         if proc.poll():
786             msg = " Failed to run command '%s' " % command
787             self.error(msg, out, err)
788             if raise_on_error:
789                 raise RuntimeError, msg
790
791         # Wait for pid file to be generated
792         pid, ppid = self.wait_pid(
793                 home = home, 
794                 pidfile = pidfile, 
795                 raise_on_error = raise_on_error)
796
797         # wait until command finishes to execute
798         self.wait_run(pid, ppid)
799       
800         (eout, err), proc = self.check_errors(home,
801             ecodefile = ecodefile,
802             stderr = stderr)
803
804         # Out is what was written in the stderr file
805         if err:
806             msg = " Failed to run command '%s' " % command
807             self.error(msg, eout, err)
808
809             if raise_on_error:
810                 raise RuntimeError, msg
811
812         (out, oerr), proc = self.check_output(home, stdout)
813         
814         return (out, err), proc
815
816     def exitcode(self, home, ecodefile = "exitcode"):
817         """
818         Get the exit code of an application.
819         Returns an integer value with the exit code 
820         """
821         (out, err), proc = self.check_output(home, ecodefile)
822
823         # Succeeded to open file, return exit code in the file
824         if proc.wait() == 0:
825             try:
826                 return int(out.strip())
827             except:
828                 # Error in the content of the file!
829                 return ExitCode.CORRUPTFILE
830
831         # No such file or directory
832         if proc.returncode == 1:
833             return ExitCode.FILENOTFOUND
834         
835         # Other error from 'cat'
836         return ExitCode.ERROR
837
838     def upload_command(self, command, 
839             shfile = "cmd.sh",
840             ecodefile = "exitcode",
841             overwrite = True,
842             env = None):
843         """ Saves the command as a bash script file in the remote host, and
844         forces to save the exit code of the command execution to the ecodefile
845         """
846
847         if not (command.strip().endswith(";") or command.strip().endswith("&")):
848             command += ";"
849       
850         # The exit code of the command will be stored in ecodefile
851         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
852                 'command': command,
853                 'ecodefile': ecodefile,
854                 } 
855
856         # Export environment
857         environ = self.format_environment(env)
858
859         # Add environ to command
860         command = environ + command
861
862         return self.upload(command, shfile, text = True, overwrite = overwrite)
863
864     def format_environment(self, env, inline = False):
865         """ Formats the environment variables for a command to be executed
866         either as an inline command
867         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
868         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
869         """
870         if not env: return ""
871
872         # Remove extra white spaces
873         env = re.sub(r'\s+', ' ', env.strip())
874
875         sep = ";" if inline else "\n"
876         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
877
878     def check_errors(self, home, 
879             ecodefile = "exitcode", 
880             stderr = "stderr"):
881         """ Checks whether errors occurred while running a command.
882         It first checks the exit code for the command, and only if the
883         exit code is an error one it returns the error output.
884
885         """
886         proc = None
887         err = ""
888
889         # get exit code saved in the 'exitcode' file
890         ecode = self.exitcode(home, ecodefile)
891
892         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
893             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
894         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
895             # The process returned an error code or didn't exist. 
896             # Check standard error.
897             (err, eerr), proc = self.check_output(home, stderr)
898
899             # If the stderr file was not found, assume nothing bad happened,
900             # and just ignore the error.
901             # (cat returns 1 for error "No such file or directory")
902             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
903                 err = "" 
904             
905         return ("", err), proc
906  
907     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
908         """ Waits until the pid file for the command is generated, 
909             and returns the pid and ppid of the process """
910         pid = ppid = None
911         delay = 1.0
912
913         for i in xrange(4):
914             pidtuple = self.getpid(home = home, pidfile = pidfile)
915             
916             if pidtuple:
917                 pid, ppid = pidtuple
918                 break
919             else:
920                 time.sleep(delay)
921                 delay = delay * 1.5
922         else:
923             msg = " Failed to get pid for pidfile %s/%s " % (
924                     home, pidfile )
925             self.error(msg)
926             
927             if raise_on_error:
928                 raise RuntimeError, msg
929
930         return pid, ppid
931
932     def wait_run(self, pid, ppid, trial = 0):
933         """ wait for a remote process to finish execution """
934         delay = 1.0
935
936         while True:
937             status = self.status(pid, ppid)
938             
939             if status is ProcStatus.FINISHED:
940                 break
941             elif status is not ProcStatus.RUNNING:
942                 delay = delay * 1.5
943                 time.sleep(delay)
944                 # If it takes more than 20 seconds to start, then
945                 # asume something went wrong
946                 if delay > 20:
947                     break
948             else:
949                 # The app is running, just wait...
950                 time.sleep(0.5)
951
952     def check_output(self, home, filename):
953         """ Retrives content of file """
954         (out, err), proc = self.execute("cat %s" % 
955             os.path.join(home, filename), retry = 1, with_lock = True)
956         return (out, err), proc
957
958     def is_alive(self):
959         """ Checks if host is responsive
960         """
961         if self.localhost:
962             return True
963
964         out = err = ""
965         # The underlying SSH layer will sometimes return an empty
966         # output (even if the command was executed without errors).
967         # To work arround this, repeat the operation N times or
968         # until the result is not empty string
969         retrydelay = 1.0
970         for i in xrange(10):
971             try:
972                 (out, err), proc = self.execute("echo 'ALIVE'",
973                         retry = 5,
974                         blocking = True,
975                         with_lock = True)
976         
977                 if out.find("ALIVE") > -1:
978                     return True
979             except:
980                 trace = traceback.format_exc()
981                 msg = "Unresponsive host. Error reaching host: %s " % trace
982                 self.error(msg, out, err)
983                 return False
984
985             time.sleep(min(30.0, retrydelay))
986             retrydelay *= 1.5
987
988         if out.find("ALIVE") > -1:
989             return True
990         else:
991             msg = "Unresponsive host. Wrong answer. "
992             self.error(msg, out, err)
993             return False
994
995     def find_home(self):
996         """ Retrieves host home directory
997         """
998         # The underlying SSH layer will sometimes return an empty
999         # output (even if the command was executed without errors).
1000         # To work arround this, repeat the operation N times or
1001         # until the result is not empty string
1002         retrydelay = 1.0
1003         for i in xrange(10):
1004             try:
1005                 (out, err), proc = self.execute("echo ${HOME}",
1006                         retry = 5,
1007                         blocking = True,
1008                         with_lock = True)
1009         
1010                 if out.strip() != "":
1011                     self._home_dir =  out.strip()
1012                     break
1013             except:
1014                 trace = traceback.format_exc()
1015                 msg = "Impossible to retrieve HOME directory" % trace
1016                 self.error(msg, out, err)
1017                 return False
1018
1019             time.sleep(min(30.0, retrydelay))
1020             retrydelay *= 1.5
1021
1022         if not self._home_dir:
1023             msg = "Impossible to retrieve HOME directory"
1024             self.error(msg, out, err)
1025             raise RuntimeError, msg
1026
1027     def filter_existing_files(self, src, dst):
1028         """ Removes files that already exist in the Linux host from src list
1029         """
1030         # construct a dictionary with { dst: src }
1031         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
1032             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1033
1034         command = []
1035         for d in dests.keys():
1036             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1037
1038         command = ";".join(command)
1039
1040         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1041     
1042         for d in dests.keys():
1043             if out.find(d) > -1:
1044                 del dests[d]
1045
1046         if not dests:
1047             return ""
1048
1049         return " ".join(dests.values())
1050