Modified FailureManager to abort only when critical resources fail
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay, failtrap
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.ExecReadOnly)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.ExecReadOnly)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.ExecReadOnly)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.ExecReadOnly)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.ExecReadOnly)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.ExecReadOnly)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.ExecReadOnly)
190
191         cls._register_attribute(hostname)
192         cls._register_attribute(username)
193         cls._register_attribute(port)
194         cls._register_attribute(home)
195         cls._register_attribute(identity)
196         cls._register_attribute(server_key)
197         cls._register_attribute(clean_home)
198         cls._register_attribute(clean_experiment)
199         cls._register_attribute(clean_processes)
200         cls._register_attribute(tear_down)
201
202     def __init__(self, ec, guid):
203         super(LinuxNode, self).__init__(ec, guid)
204         self._os = None
205         # home directory at Linux host
206         self._home_dir = ""
207         
208         # lock to prevent concurrent applications on the same node,
209         # to execute commands at the same time. There are potential
210         # concurrency issues when using SSH to a same host from 
211         # multiple threads. There are also possible operational 
212         # issues, e.g. an application querying the existence 
213         # of a file or folder prior to its creation, and another 
214         # application creating the same file or folder in between.
215         self._node_lock = threading.Lock()
216     
217     def log_message(self, msg):
218         return " guid %d - host %s - %s " % (self.guid, 
219                 self.get("hostname"), msg)
220
221     @property
222     def home_dir(self):
223         home = self.get("home") or ""
224         if not home.startswith("/"):
225            home = os.path.join(self._home_dir, home) 
226         return home
227
228     @property
229     def usr_dir(self):
230         return os.path.join(self.home_dir, "nepi-usr")
231
232     @property
233     def lib_dir(self):
234         return os.path.join(self.usr_dir, "lib")
235
236     @property
237     def bin_dir(self):
238         return os.path.join(self.usr_dir, "bin")
239
240     @property
241     def src_dir(self):
242         return os.path.join(self.usr_dir, "src")
243
244     @property
245     def share_dir(self):
246         return os.path.join(self.usr_dir, "share")
247
248     @property
249     def exp_dir(self):
250         return os.path.join(self.home_dir, "nepi-exp")
251
252     @property
253     def exp_home(self):
254         return os.path.join(self.exp_dir, self.ec.exp_id)
255
256     @property
257     def node_home(self):
258         return os.path.join(self.exp_home, "node-%d" % self.guid)
259
260     @property
261     def run_home(self):
262         return os.path.join(self.node_home, self.ec.run_id)
263
264     @property
265     def os(self):
266         if self._os:
267             return self._os
268
269         if (not self.get("hostname") or not self.get("username")):
270             msg = "Can't resolve OS, insufficient data "
271             self.error(msg)
272             raise RuntimeError, msg
273
274         out = self.get_os()
275
276         if out.find("Fedora release 8") == 0:
277             self._os = OSType.FEDORA_8
278         elif out.find("Fedora release 12") == 0:
279             self._os = OSType.FEDORA_12
280         elif out.find("Fedora release 14") == 0:
281             self._os = OSType.FEDORA_14
282         elif out.find("Debian") == 0: 
283             self._os = OSType.DEBIAN
284         elif out.find("Ubuntu") ==0:
285             self._os = OSType.UBUNTU
286         else:
287             msg = "Unsupported OS"
288             self.error(msg, out)
289             raise RuntimeError, "%s - %s " %( msg, out )
290
291         return self._os
292
293     def get_os(self):
294         # The underlying SSH layer will sometimes return an empty
295         # output (even if the command was executed without errors).
296         # To work arround this, repeat the operation N times or
297         # until the result is not empty string
298         out = ""
299         retrydelay = 1.0
300         for i in xrange(10):
301             try:
302                 (out, err), proc = self.execute("cat /etc/issue", 
303                         retry = 5,
304                         with_lock = True,
305                         blocking = True)
306
307                 if out.strip() != "":
308                     return out
309             except:
310                 trace = traceback.format_exc()
311                 msg = "Error detecting OS: %s " % trace
312                 self.error(msg, out, err)
313                 return False
314
315             time.sleep(min(30.0, retrydelay))
316             retrydelay *= 1.5
317
318     @property
319     def use_deb(self):
320         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
321
322     @property
323     def use_rpm(self):
324         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
325                 OSType.FEDORA]
326
327     @property
328     def localhost(self):
329         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
330
331     @failtrap
332     def provision(self):
333         # check if host is alive
334         if not self.is_alive():
335             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
336             self.error(msg)
337             raise RuntimeError, msg
338
339         self.find_home()
340
341         if self.get("cleanProcesses"):
342             self.clean_processes()
343
344         if self.get("cleanHome"):
345             self.clean_home()
346  
347         if self.get("cleanExperiment"):
348             self.clean_experiment()
349     
350         # Create shared directory structure
351         self.mkdir(self.lib_dir)
352         self.mkdir(self.bin_dir)
353         self.mkdir(self.src_dir)
354         self.mkdir(self.share_dir)
355
356         # Create experiment node home directory
357         self.mkdir(self.node_home)
358
359         super(LinuxNode, self).provision()
360
361     @failtrap
362     def deploy(self):
363         if self.state == ResourceState.NEW:
364             self.info("Deploying node")
365             self.discover()
366             self.provision()
367
368         # Node needs to wait until all associated interfaces are 
369         # ready before it can finalize deployment
370         from nepi.resources.linux.interface import LinuxInterface
371         ifaces = self.get_connected(LinuxInterface.rtype())
372         for iface in ifaces:
373             if iface.state < ResourceState.READY:
374                 self.ec.schedule(reschedule_delay, self.deploy)
375                 return 
376
377         super(LinuxNode, self).deploy()
378
379     def release(self):
380         try:
381             rms = self.get_connected()
382             for rm in rms:
383                 # Node needs to wait until all associated RMs are released
384                 # before it can be released
385                 if rm.state < ResourceState.STOPPED:
386                     self.ec.schedule(reschedule_delay, self.release)
387                     return 
388
389             tear_down = self.get("tearDown")
390             if tear_down:
391                 self.execute(tear_down)
392
393             self.clean_processes()
394         except:
395             import traceback
396             err = traceback.format_exc()
397             self.error(err)
398
399         super(LinuxNode, self).release()
400
401     def valid_connection(self, guid):
402         # TODO: Validate!
403         return True
404
405     def clean_processes(self, killer = False):
406         self.info("Cleaning up processes")
407         
408         if killer:
409             # Hardcore kill
410             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
411                 "sudo -S killall python tcpdump || /bin/true ; " +
412                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
413                 "sudo -S killall -u root || /bin/true ; " +
414                 "sudo -S killall -u root || /bin/true ; ")
415         else:
416             # Be gentler...
417             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
418                 "sudo -S killall tcpdump || /bin/true ; " +
419                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
420                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
421
422         out = err = ""
423         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
424             
425     def clean_home(self):
426         """ Cleans all NEPI related folders in the Linux host
427         """
428         self.info("Cleaning up home")
429         
430         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
431                 self.home_dir )
432
433         return self.execute(cmd, with_lock = True)
434
435     def clean_experiment(self):
436         """ Cleans all experiment related files in the Linux host.
437         It preserves NEPI files and folders that have a multi experiment
438         scope.
439         """
440         self.info("Cleaning up experiment files")
441         
442         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
443                 self.exp_dir,
444                 self.ec.exp_id )
445             
446         return self.execute(cmd, with_lock = True)
447
448     def execute(self, command,
449             sudo = False,
450             stdin = None, 
451             env = None,
452             tty = False,
453             forward_x11 = False,
454             timeout = None,
455             retry = 3,
456             err_on_timeout = True,
457             connect_timeout = 30,
458             strict_host_checking = False,
459             persistent = True,
460             blocking = True,
461             with_lock = False
462             ):
463         """ Notice that this invocation will block until the
464         execution finishes. If this is not the desired behavior,
465         use 'run' instead."""
466
467         if self.localhost:
468             (out, err), proc = execfuncs.lexec(command, 
469                     user = user,
470                     sudo = sudo,
471                     stdin = stdin,
472                     env = env)
473         else:
474             if with_lock:
475                 with self._node_lock:
476                     (out, err), proc = sshfuncs.rexec(
477                         command, 
478                         host = self.get("hostname"),
479                         user = self.get("username"),
480                         port = self.get("port"),
481                         agent = True,
482                         sudo = sudo,
483                         stdin = stdin,
484                         identity = self.get("identity"),
485                         server_key = self.get("serverKey"),
486                         env = env,
487                         tty = tty,
488                         forward_x11 = forward_x11,
489                         timeout = timeout,
490                         retry = retry,
491                         err_on_timeout = err_on_timeout,
492                         connect_timeout = connect_timeout,
493                         persistent = persistent,
494                         blocking = blocking, 
495                         strict_host_checking = strict_host_checking
496                         )
497             else:
498                 (out, err), proc = sshfuncs.rexec(
499                     command, 
500                     host = self.get("hostname"),
501                     user = self.get("username"),
502                     port = self.get("port"),
503                     agent = True,
504                     sudo = sudo,
505                     stdin = stdin,
506                     identity = self.get("identity"),
507                     server_key = self.get("serverKey"),
508                     env = env,
509                     tty = tty,
510                     forward_x11 = forward_x11,
511                     timeout = timeout,
512                     retry = retry,
513                     err_on_timeout = err_on_timeout,
514                     connect_timeout = connect_timeout,
515                     persistent = persistent,
516                     blocking = blocking, 
517                     strict_host_checking = strict_host_checking
518                     )
519
520         return (out, err), proc
521
522     def run(self, command, home,
523             create_home = False,
524             pidfile = 'pidfile',
525             stdin = None, 
526             stdout = 'stdout', 
527             stderr = 'stderr', 
528             sudo = False,
529             tty = False):
530         
531         self.debug("Running command '%s'" % command)
532         
533         if self.localhost:
534             (out, err), proc = execfuncs.lspawn(command, pidfile, 
535                     stdout = stdout, 
536                     stderr = stderr, 
537                     stdin = stdin, 
538                     home = home, 
539                     create_home = create_home, 
540                     sudo = sudo,
541                     user = user) 
542         else:
543             with self._node_lock:
544                 (out, err), proc = sshfuncs.rspawn(
545                     command,
546                     pidfile = pidfile,
547                     home = home,
548                     create_home = create_home,
549                     stdin = stdin if stdin is not None else '/dev/null',
550                     stdout = stdout if stdout else '/dev/null',
551                     stderr = stderr if stderr else '/dev/null',
552                     sudo = sudo,
553                     host = self.get("hostname"),
554                     user = self.get("username"),
555                     port = self.get("port"),
556                     agent = True,
557                     identity = self.get("identity"),
558                     server_key = self.get("serverKey"),
559                     tty = tty
560                     )
561
562         return (out, err), proc
563
564     def getpid(self, home, pidfile = "pidfile"):
565         if self.localhost:
566             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
567         else:
568             with self._node_lock:
569                 pidtuple = sshfuncs.rgetpid(
570                     os.path.join(home, pidfile),
571                     host = self.get("hostname"),
572                     user = self.get("username"),
573                     port = self.get("port"),
574                     agent = True,
575                     identity = self.get("identity"),
576                     server_key = self.get("serverKey")
577                     )
578         
579         return pidtuple
580
581     def status(self, pid, ppid):
582         if self.localhost:
583             status = execfuncs.lstatus(pid, ppid)
584         else:
585             with self._node_lock:
586                 status = sshfuncs.rstatus(
587                         pid, ppid,
588                         host = self.get("hostname"),
589                         user = self.get("username"),
590                         port = self.get("port"),
591                         agent = True,
592                         identity = self.get("identity"),
593                         server_key = self.get("serverKey")
594                         )
595            
596         return status
597     
598     def kill(self, pid, ppid, sudo = False):
599         out = err = ""
600         proc = None
601         status = self.status(pid, ppid)
602
603         if status == sshfuncs.ProcStatus.RUNNING:
604             if self.localhost:
605                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
606             else:
607                 with self._node_lock:
608                     (out, err), proc = sshfuncs.rkill(
609                         pid, ppid,
610                         host = self.get("hostname"),
611                         user = self.get("username"),
612                         port = self.get("port"),
613                         agent = True,
614                         sudo = sudo,
615                         identity = self.get("identity"),
616                         server_key = self.get("serverKey")
617                         )
618
619         return (out, err), proc
620
621     def copy(self, src, dst):
622         if self.localhost:
623             (out, err), proc = execfuncs.lcopy(source, dest, 
624                     recursive = True,
625                     strict_host_checking = False)
626         else:
627             with self._node_lock:
628                 (out, err), proc = sshfuncs.rcopy(
629                     src, dst, 
630                     port = self.get("port"),
631                     identity = self.get("identity"),
632                     server_key = self.get("serverKey"),
633                     recursive = True,
634                     strict_host_checking = False)
635
636         return (out, err), proc
637
638     def upload(self, src, dst, text = False, overwrite = True):
639         """ Copy content to destination
640
641            src  content to copy. Can be a local file, directory or a list of files
642
643            dst  destination path on the remote host (remote is always self.host)
644
645            text src is text input, it must be stored into a temp file before uploading
646         """
647         # If source is a string input 
648         f = None
649         if text and not os.path.isfile(src):
650             # src is text input that should be uploaded as file
651             # create a temporal file with the content to upload
652             f = tempfile.NamedTemporaryFile(delete=False)
653             f.write(src)
654             f.close()
655             src = f.name
656
657         # If dst files should not be overwritten, check that the files do not
658         # exits already 
659         if overwrite == False:
660             src = self.filter_existing_files(src, dst)
661             if not src:
662                 return ("", ""), None 
663
664         if not self.localhost:
665             # Build destination as <user>@<server>:<path>
666             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
667
668         result = self.copy(src, dst)
669
670         # clean up temp file
671         if f:
672             os.remove(f.name)
673
674         return result
675
676     def download(self, src, dst):
677         if not self.localhost:
678             # Build destination as <user>@<server>:<path>
679             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
680         return self.copy(src, dst)
681
682     def install_packages_command(self, packages):
683         command = ""
684         if self.use_rpm:
685             command = rpmfuncs.install_packages_command(self.os, packages)
686         elif self.use_deb:
687             command = debfuncs.install_packages_command(self.os, packages)
688         else:
689             msg = "Error installing packages ( OS not known ) "
690             self.error(msg, self.os)
691             raise RuntimeError, msg
692
693         return command
694
695     def install_packages(self, packages, home, run_home = None):
696         """ Install packages in the Linux host.
697
698         'home' is the directory to upload the package installation script.
699         'run_home' is the directory from where to execute the script.
700         """
701         command = self.install_packages_command(packages)
702
703         run_home = run_home or home
704
705         (out, err), proc = self.run_and_wait(command, run_home, 
706             shfile = os.path.join(home, "instpkg.sh"),
707             pidfile = "instpkg_pidfile",
708             ecodefile = "instpkg_exitcode",
709             stdout = "instpkg_stdout", 
710             stderr = "instpkg_stderr",
711             overwrite = False,
712             raise_on_error = True)
713
714         return (out, err), proc 
715
716     def remove_packages(self, packages, home, run_home = None):
717         """ Uninstall packages from the Linux host.
718
719         'home' is the directory to upload the package un-installation script.
720         'run_home' is the directory from where to execute the script.
721         """
722         if self.use_rpm:
723             command = rpmfuncs.remove_packages_command(self.os, packages)
724         elif self.use_deb:
725             command = debfuncs.remove_packages_command(self.os, packages)
726         else:
727             msg = "Error removing packages ( OS not known ) "
728             self.error(msg)
729             raise RuntimeError, msg
730
731         run_home = run_home or home
732
733         (out, err), proc = self.run_and_wait(command, run_home, 
734             shfile = os.path.join(home, "rmpkg.sh"),
735             pidfile = "rmpkg_pidfile",
736             ecodefile = "rmpkg_exitcode",
737             stdout = "rmpkg_stdout", 
738             stderr = "rmpkg_stderr",
739             overwrite = False,
740             raise_on_error = True)
741          
742         return (out, err), proc 
743
744     def mkdir(self, path, clean = False):
745         if clean:
746             self.rmdir(path)
747
748         return self.execute("mkdir -p %s" % path, with_lock = True)
749
750     def rmdir(self, path):
751         return self.execute("rm -rf %s" % path, with_lock = True)
752         
753     def run_and_wait(self, command, home, 
754             shfile = "cmd.sh",
755             env = None,
756             overwrite = True,
757             pidfile = "pidfile", 
758             ecodefile = "exitcode", 
759             stdin = None, 
760             stdout = "stdout", 
761             stderr = "stderr", 
762             sudo = False,
763             tty = False,
764             raise_on_error = False):
765         """
766         Uploads the 'command' to a bash script in the host.
767         Then runs the script detached in background in the host, and
768         busy-waites until the script finishes executing.
769         """
770
771         if not shfile.startswith("/"):
772             shfile = os.path.join(home, shfile)
773
774         self.upload_command(command, 
775             shfile = shfile, 
776             ecodefile = ecodefile, 
777             env = env,
778             overwrite = overwrite)
779
780         command = "bash %s" % shfile
781         # run command in background in remote host
782         (out, err), proc = self.run(command, home, 
783                 pidfile = pidfile,
784                 stdin = stdin, 
785                 stdout = stdout, 
786                 stderr = stderr, 
787                 sudo = sudo,
788                 tty = tty)
789
790         # check no errors occurred
791         if proc.poll():
792             msg = " Failed to run command '%s' " % command
793             self.error(msg, out, err)
794             if raise_on_error:
795                 raise RuntimeError, msg
796
797         # Wait for pid file to be generated
798         pid, ppid = self.wait_pid(
799                 home = home, 
800                 pidfile = pidfile, 
801                 raise_on_error = raise_on_error)
802
803         # wait until command finishes to execute
804         self.wait_run(pid, ppid)
805       
806         (eout, err), proc = self.check_errors(home,
807             ecodefile = ecodefile,
808             stderr = stderr)
809
810         # Out is what was written in the stderr file
811         if err:
812             msg = " Failed to run command '%s' " % command
813             self.error(msg, eout, err)
814
815             if raise_on_error:
816                 raise RuntimeError, msg
817
818         (out, oerr), proc = self.check_output(home, stdout)
819         
820         return (out, err), proc
821
822     def exitcode(self, home, ecodefile = "exitcode"):
823         """
824         Get the exit code of an application.
825         Returns an integer value with the exit code 
826         """
827         (out, err), proc = self.check_output(home, ecodefile)
828
829         # Succeeded to open file, return exit code in the file
830         if proc.wait() == 0:
831             try:
832                 return int(out.strip())
833             except:
834                 # Error in the content of the file!
835                 return ExitCode.CORRUPTFILE
836
837         # No such file or directory
838         if proc.returncode == 1:
839             return ExitCode.FILENOTFOUND
840         
841         # Other error from 'cat'
842         return ExitCode.ERROR
843
844     def upload_command(self, command, 
845             shfile = "cmd.sh",
846             ecodefile = "exitcode",
847             overwrite = True,
848             env = None):
849         """ Saves the command as a bash script file in the remote host, and
850         forces to save the exit code of the command execution to the ecodefile
851         """
852
853         if not (command.strip().endswith(";") or command.strip().endswith("&")):
854             command += ";"
855       
856         # The exit code of the command will be stored in ecodefile
857         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
858                 'command': command,
859                 'ecodefile': ecodefile,
860                 } 
861
862         # Export environment
863         environ = self.format_environment(env)
864
865         # Add environ to command
866         command = environ + command
867
868         return self.upload(command, shfile, text = True, overwrite = overwrite)
869
870     def format_environment(self, env, inline = False):
871         """ Formats the environment variables for a command to be executed
872         either as an inline command
873         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
874         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
875         """
876         if not env: return ""
877
878         # Remove extra white spaces
879         env = re.sub(r'\s+', ' ', env.strip())
880
881         sep = ";" if inline else "\n"
882         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
883
884     def check_errors(self, home, 
885             ecodefile = "exitcode", 
886             stderr = "stderr"):
887         """ Checks whether errors occurred while running a command.
888         It first checks the exit code for the command, and only if the
889         exit code is an error one it returns the error output.
890
891         """
892         proc = None
893         err = ""
894
895         # get exit code saved in the 'exitcode' file
896         ecode = self.exitcode(home, ecodefile)
897
898         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
899             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
900         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
901             # The process returned an error code or didn't exist. 
902             # Check standard error.
903             (err, eerr), proc = self.check_output(home, stderr)
904
905             # If the stderr file was not found, assume nothing bad happened,
906             # and just ignore the error.
907             # (cat returns 1 for error "No such file or directory")
908             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
909                 err = "" 
910             
911         return ("", err), proc
912  
913     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
914         """ Waits until the pid file for the command is generated, 
915             and returns the pid and ppid of the process """
916         pid = ppid = None
917         delay = 1.0
918
919         for i in xrange(4):
920             pidtuple = self.getpid(home = home, pidfile = pidfile)
921             
922             if pidtuple:
923                 pid, ppid = pidtuple
924                 break
925             else:
926                 time.sleep(delay)
927                 delay = delay * 1.5
928         else:
929             msg = " Failed to get pid for pidfile %s/%s " % (
930                     home, pidfile )
931             self.error(msg)
932             
933             if raise_on_error:
934                 raise RuntimeError, msg
935
936         return pid, ppid
937
938     def wait_run(self, pid, ppid, trial = 0):
939         """ wait for a remote process to finish execution """
940         delay = 1.0
941
942         while True:
943             status = self.status(pid, ppid)
944             
945             if status is ProcStatus.FINISHED:
946                 break
947             elif status is not ProcStatus.RUNNING:
948                 delay = delay * 1.5
949                 time.sleep(delay)
950                 # If it takes more than 20 seconds to start, then
951                 # asume something went wrong
952                 if delay > 20:
953                     break
954             else:
955                 # The app is running, just wait...
956                 time.sleep(0.5)
957
958     def check_output(self, home, filename):
959         """ Retrives content of file """
960         (out, err), proc = self.execute("cat %s" % 
961             os.path.join(home, filename), retry = 1, with_lock = True)
962         return (out, err), proc
963
964     def is_alive(self):
965         """ Checks if host is responsive
966         """
967         if self.localhost:
968             return True
969
970         out = err = ""
971         # The underlying SSH layer will sometimes return an empty
972         # output (even if the command was executed without errors).
973         # To work arround this, repeat the operation N times or
974         # until the result is not empty string
975         retrydelay = 1.0
976         for i in xrange(10):
977             try:
978                 (out, err), proc = self.execute("echo 'ALIVE'",
979                         retry = 5,
980                         blocking = True,
981                         with_lock = True)
982         
983                 if out.find("ALIVE") > -1:
984                     return True
985             except:
986                 trace = traceback.format_exc()
987                 msg = "Unresponsive host. Error reaching host: %s " % trace
988                 self.error(msg, out, err)
989                 return False
990
991             time.sleep(min(30.0, retrydelay))
992             retrydelay *= 1.5
993
994         if out.find("ALIVE") > -1:
995             return True
996         else:
997             msg = "Unresponsive host. Wrong answer. "
998             self.error(msg, out, err)
999             return False
1000
1001     def find_home(self):
1002         """ Retrieves host home directory
1003         """
1004         # The underlying SSH layer will sometimes return an empty
1005         # output (even if the command was executed without errors).
1006         # To work arround this, repeat the operation N times or
1007         # until the result is not empty string
1008         retrydelay = 1.0
1009         for i in xrange(10):
1010             try:
1011                 (out, err), proc = self.execute("echo ${HOME}",
1012                         retry = 5,
1013                         blocking = True,
1014                         with_lock = True)
1015         
1016                 if out.strip() != "":
1017                     self._home_dir =  out.strip()
1018                     break
1019             except:
1020                 trace = traceback.format_exc()
1021                 msg = "Impossible to retrieve HOME directory" % trace
1022                 self.error(msg, out, err)
1023                 return False
1024
1025             time.sleep(min(30.0, retrydelay))
1026             retrydelay *= 1.5
1027
1028         if not self._home_dir:
1029             msg = "Impossible to retrieve HOME directory"
1030             self.error(msg, out, err)
1031             raise RuntimeError, msg
1032
1033     def filter_existing_files(self, src, dst):
1034         """ Removes files that already exist in the Linux host from src list
1035         """
1036         # construct a dictionary with { dst: src }
1037         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
1038             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1039
1040         command = []
1041         for d in dests.keys():
1042             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1043
1044         command = ";".join(command)
1045
1046         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1047     
1048         for d in dests.keys():
1049             if out.find(d) > -1:
1050                 del dests[d]
1051
1052         if not dests:
1053             return ""
1054
1055         return " ".join(dests.values())
1056