Bugfixing GPG key error when installing dependencies in linux nodes
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
22         reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.ExecReadOnly)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.ExecReadOnly)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.ExecReadOnly)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 flags = Flags.ExecReadOnly)
172
173         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
174                 " from a previous same experiment, before the new experiment starts", 
175                 flags = Flags.ExecReadOnly)
176         
177         clean_processes = Attribute("cleanProcesses", 
178                 "Kill all running processes before starting experiment",
179                 flags = Flags.ExecReadOnly)
180         
181         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
182                 "releasing the resource",
183                 flags = Flags.ExecReadOnly)
184
185         cls._register_attribute(hostname)
186         cls._register_attribute(username)
187         cls._register_attribute(port)
188         cls._register_attribute(home)
189         cls._register_attribute(identity)
190         cls._register_attribute(server_key)
191         cls._register_attribute(clean_home)
192         cls._register_attribute(clean_experiment)
193         cls._register_attribute(clean_processes)
194         cls._register_attribute(tear_down)
195
196     def __init__(self, ec, guid):
197         super(LinuxNode, self).__init__(ec, guid)
198         self._os = None
199         # home directory at Linux host
200         self._home_dir = ""
201         
202         # lock to prevent concurrent applications on the same node,
203         # to execute commands at the same time. There are potential
204         # concurrency issues when using SSH to a same host from 
205         # multiple threads. There are also possible operational 
206         # issues, e.g. an application querying the existence 
207         # of a file or folder prior to its creation, and another 
208         # application creating the same file or folder in between.
209         self._node_lock = threading.Lock()
210     
211     def log_message(self, msg):
212         return " guid %d - host %s - %s " % (self.guid, 
213                 self.get("hostname"), msg)
214
215     @property
216     def home_dir(self):
217         home = self.get("home") or ""
218         if not home.startswith("/"):
219            home = os.path.join(self._home_dir, home) 
220         return home
221
222     @property
223     def usr_dir(self):
224         return os.path.join(self.home_dir, "nepi-usr")
225
226     @property
227     def lib_dir(self):
228         return os.path.join(self.usr_dir, "lib")
229
230     @property
231     def bin_dir(self):
232         return os.path.join(self.usr_dir, "bin")
233
234     @property
235     def src_dir(self):
236         return os.path.join(self.usr_dir, "src")
237
238     @property
239     def share_dir(self):
240         return os.path.join(self.usr_dir, "share")
241
242     @property
243     def exp_dir(self):
244         return os.path.join(self.home_dir, "nepi-exp")
245
246     @property
247     def exp_home(self):
248         return os.path.join(self.exp_dir, self.ec.exp_id)
249
250     @property
251     def node_home(self):
252         return os.path.join(self.exp_home, "node-%d" % self.guid)
253
254     @property
255     def run_home(self):
256         return os.path.join(self.node_home, self.ec.run_id)
257
258     @property
259     def os(self):
260         if self._os:
261             return self._os
262
263         if (not self.get("hostname") or not self.get("username")):
264             msg = "Can't resolve OS, insufficient data "
265             self.error(msg)
266             raise RuntimeError, msg
267
268         out = self.get_os()
269
270         if out.find("Fedora release 8") == 0:
271             self._os = OSType.FEDORA_8
272         elif out.find("Fedora release 12") == 0:
273             self._os = OSType.FEDORA_12
274         elif out.find("Fedora release 14") == 0:
275             self._os = OSType.FEDORA_14
276         elif out.find("Fedora release") == 0:
277             self._os = OSType.FEDORA
278         elif out.find("Debian") == 0: 
279             self._os = OSType.DEBIAN
280         elif out.find("Ubuntu") ==0:
281             self._os = OSType.UBUNTU
282         else:
283             msg = "Unsupported OS"
284             self.error(msg, out)
285             raise RuntimeError, "%s - %s " %( msg, out )
286
287         return self._os
288
289     def get_os(self):
290         # The underlying SSH layer will sometimes return an empty
291         # output (even if the command was executed without errors).
292         # To work arround this, repeat the operation N times or
293         # until the result is not empty string
294         out = ""
295         retrydelay = 1.0
296         for i in xrange(2):
297             try:
298                 (out, err), proc = self.execute("cat /etc/issue", 
299                         retry = 5,
300                         with_lock = True,
301                         blocking = True)
302
303                 if out.strip() != "":
304                     return out
305             except:
306                 trace = traceback.format_exc()
307                 msg = "Error detecting OS: %s " % trace
308                 self.error(msg, out, err)
309                 return False
310
311             time.sleep(min(30.0, retrydelay))
312             retrydelay *= 1.5
313
314
315     @property
316     def use_deb(self):
317         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
318
319     @property
320     def use_rpm(self):
321         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
322                 OSType.FEDORA]
323
324     @property
325     def localhost(self):
326         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
327
328     def provision(self):
329         # check if host is alive
330         if not self.is_alive():
331             self.fail()
332             
333             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
334             self.error(msg)
335             raise RuntimeError, msg
336
337         self.find_home()
338
339         if self.get("cleanProcesses"):
340             self.clean_processes()
341
342         if self.get("cleanHome"):
343             self.clean_home()
344  
345         if self.get("cleanExperiment"):
346             self.clean_experiment()
347     
348         # Create shared directory structure
349         self.mkdir(self.lib_dir)
350         self.mkdir(self.bin_dir)
351         self.mkdir(self.src_dir)
352         self.mkdir(self.share_dir)
353
354         # Create experiment node home directory
355         self.mkdir(self.node_home)
356
357         super(LinuxNode, self).provision()
358
359     def deploy(self):
360         if self.state == ResourceState.NEW:
361             try:
362                 self.discover()
363                 self.provision()
364             except:
365                 self.fail()
366                 raise
367
368         # Node needs to wait until all associated interfaces are 
369         # ready before it can finalize deployment
370         from nepi.resources.linux.interface import LinuxInterface
371         ifaces = self.get_connected(LinuxInterface.rtype())
372         for iface in ifaces:
373             if iface.state < ResourceState.READY:
374                 self.ec.schedule(reschedule_delay, self.deploy)
375                 return 
376
377         super(LinuxNode, self).deploy()
378
379     def release(self):
380         # Node needs to wait until all associated RMs are released
381         # to be released
382         rms = self.get_connected()
383         for rm in rms:
384             if rm.state < ResourceState.STOPPED:
385                 self.ec.schedule(reschedule_delay, self.release)
386                 return 
387
388         tear_down = self.get("tearDown")
389         if tear_down:
390             self.execute(tear_down)
391
392         self.clean_processes()
393
394         super(LinuxNode, self).release()
395
396     def valid_connection(self, guid):
397         # TODO: Validate!
398         return True
399
400     def clean_processes(self, killer = False):
401         self.info("Cleaning up processes")
402         
403         if killer:
404             # Hardcore kill
405             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
406                 "sudo -S killall python tcpdump || /bin/true ; " +
407                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
408                 "sudo -S killall -u root || /bin/true ; " +
409                 "sudo -S killall -u root || /bin/true ; ")
410         else:
411             # Be gentler...
412             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
413                 "sudo -S killall tcpdump || /bin/true ; " +
414                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
415                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
416
417         out = err = ""
418         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
419             
420     def clean_home(self):
421         """ Cleans all NEPI related folders in the Linux host
422         """
423         self.info("Cleaning up home")
424         
425         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
426                 self.home_dir )
427
428         return self.execute(cmd, with_lock = True)
429
430     def clean_experiment(self):
431         """ Cleans all experiment related files in the Linux host.
432         It preserves NEPI files and folders that have a multi experiment
433         scope.
434         """
435         self.info("Cleaning up experiment files")
436         
437         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
438                 self.exp_dir,
439                 self.ec.exp_id )
440             
441         return self.execute(cmd, with_lock = True)
442
443     def execute(self, command,
444             sudo = False,
445             stdin = None, 
446             env = None,
447             tty = False,
448             forward_x11 = False,
449             timeout = None,
450             retry = 3,
451             err_on_timeout = True,
452             connect_timeout = 30,
453             strict_host_checking = False,
454             persistent = True,
455             blocking = True,
456             with_lock = False
457             ):
458         """ Notice that this invocation will block until the
459         execution finishes. If this is not the desired behavior,
460         use 'run' instead."""
461
462         if self.localhost:
463             (out, err), proc = execfuncs.lexec(command, 
464                     user = user,
465                     sudo = sudo,
466                     stdin = stdin,
467                     env = env)
468         else:
469             if with_lock:
470                 with self._node_lock:
471                     (out, err), proc = sshfuncs.rexec(
472                         command, 
473                         host = self.get("hostname"),
474                         user = self.get("username"),
475                         port = self.get("port"),
476                         agent = True,
477                         sudo = sudo,
478                         stdin = stdin,
479                         identity = self.get("identity"),
480                         server_key = self.get("serverKey"),
481                         env = env,
482                         tty = tty,
483                         forward_x11 = forward_x11,
484                         timeout = timeout,
485                         retry = retry,
486                         err_on_timeout = err_on_timeout,
487                         connect_timeout = connect_timeout,
488                         persistent = persistent,
489                         blocking = blocking, 
490                         strict_host_checking = strict_host_checking
491                         )
492             else:
493                 (out, err), proc = sshfuncs.rexec(
494                     command, 
495                     host = self.get("hostname"),
496                     user = self.get("username"),
497                     port = self.get("port"),
498                     agent = True,
499                     sudo = sudo,
500                     stdin = stdin,
501                     identity = self.get("identity"),
502                     server_key = self.get("serverKey"),
503                     env = env,
504                     tty = tty,
505                     forward_x11 = forward_x11,
506                     timeout = timeout,
507                     retry = retry,
508                     err_on_timeout = err_on_timeout,
509                     connect_timeout = connect_timeout,
510                     persistent = persistent,
511                     blocking = blocking, 
512                     strict_host_checking = strict_host_checking
513                     )
514
515         return (out, err), proc
516
517     def run(self, command, home,
518             create_home = False,
519             pidfile = 'pidfile',
520             stdin = None, 
521             stdout = 'stdout', 
522             stderr = 'stderr', 
523             sudo = False,
524             tty = False):
525         
526         self.debug("Running command '%s'" % command)
527         
528         if self.localhost:
529             (out, err), proc = execfuncs.lspawn(command, pidfile, 
530                     stdout = stdout, 
531                     stderr = stderr, 
532                     stdin = stdin, 
533                     home = home, 
534                     create_home = create_home, 
535                     sudo = sudo,
536                     user = user) 
537         else:
538             with self._node_lock:
539                 (out, err), proc = sshfuncs.rspawn(
540                     command,
541                     pidfile = pidfile,
542                     home = home,
543                     create_home = create_home,
544                     stdin = stdin if stdin is not None else '/dev/null',
545                     stdout = stdout if stdout else '/dev/null',
546                     stderr = stderr if stderr else '/dev/null',
547                     sudo = sudo,
548                     host = self.get("hostname"),
549                     user = self.get("username"),
550                     port = self.get("port"),
551                     agent = True,
552                     identity = self.get("identity"),
553                     server_key = self.get("serverKey"),
554                     tty = tty
555                     )
556
557         return (out, err), proc
558
559     def getpid(self, home, pidfile = "pidfile"):
560         if self.localhost:
561             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
562         else:
563             with self._node_lock:
564                 pidtuple = sshfuncs.rgetpid(
565                     os.path.join(home, pidfile),
566                     host = self.get("hostname"),
567                     user = self.get("username"),
568                     port = self.get("port"),
569                     agent = True,
570                     identity = self.get("identity"),
571                     server_key = self.get("serverKey")
572                     )
573         
574         return pidtuple
575
576     def status(self, pid, ppid):
577         if self.localhost:
578             status = execfuncs.lstatus(pid, ppid)
579         else:
580             with self._node_lock:
581                 status = sshfuncs.rstatus(
582                         pid, ppid,
583                         host = self.get("hostname"),
584                         user = self.get("username"),
585                         port = self.get("port"),
586                         agent = True,
587                         identity = self.get("identity"),
588                         server_key = self.get("serverKey")
589                         )
590            
591         return status
592     
593     def kill(self, pid, ppid, sudo = False):
594         out = err = ""
595         proc = None
596         status = self.status(pid, ppid)
597
598         if status == sshfuncs.ProcStatus.RUNNING:
599             if self.localhost:
600                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
601             else:
602                 with self._node_lock:
603                     (out, err), proc = sshfuncs.rkill(
604                         pid, ppid,
605                         host = self.get("hostname"),
606                         user = self.get("username"),
607                         port = self.get("port"),
608                         agent = True,
609                         sudo = sudo,
610                         identity = self.get("identity"),
611                         server_key = self.get("serverKey")
612                         )
613
614         return (out, err), proc
615
616     def copy(self, src, dst):
617         if self.localhost:
618             (out, err), proc = execfuncs.lcopy(source, dest, 
619                     recursive = True,
620                     strict_host_checking = False)
621         else:
622             with self._node_lock:
623                 (out, err), proc = sshfuncs.rcopy(
624                     src, dst, 
625                     port = self.get("port"),
626                     identity = self.get("identity"),
627                     server_key = self.get("serverKey"),
628                     recursive = True,
629                     strict_host_checking = False)
630
631         return (out, err), proc
632
633
634     def upload(self, src, dst, text = False, overwrite = True):
635         """ Copy content to destination
636
637            src  content to copy. Can be a local file, directory or a list of files
638
639            dst  destination path on the remote host (remote is always self.host)
640
641            text src is text input, it must be stored into a temp file before uploading
642         """
643         # If source is a string input 
644         f = None
645         if text and not os.path.isfile(src):
646             # src is text input that should be uploaded as file
647             # create a temporal file with the content to upload
648             f = tempfile.NamedTemporaryFile(delete=False)
649             f.write(src)
650             f.close()
651             src = f.name
652
653         # If dst files should not be overwritten, check that the files do not
654         # exits already 
655         if overwrite == False:
656             src = self.filter_existing_files(src, dst)
657             if not src:
658                 return ("", ""), None 
659
660         if not self.localhost:
661             # Build destination as <user>@<server>:<path>
662             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
663
664         result = self.copy(src, dst)
665
666         # clean up temp file
667         if f:
668             os.remove(f.name)
669
670         return result
671
672     def download(self, src, dst):
673         if not self.localhost:
674             # Build destination as <user>@<server>:<path>
675             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
676         return self.copy(src, dst)
677
678     def install_packages_command(self, packages):
679         command = ""
680         if self.use_rpm:
681             command = rpmfuncs.install_packages_command(self.os, packages)
682         elif self.use_deb:
683             command = debfuncs.install_packages_command(self.os, packages)
684         else:
685             msg = "Error installing packages ( OS not known ) "
686             self.error(msg, self.os)
687             raise RuntimeError, msg
688
689         return command
690
691     def install_packages(self, packages, home, run_home = None):
692         """ Install packages in the Linux host.
693
694         'home' is the directory to upload the package installation script.
695         'run_home' is the directory from where to execute the script.
696         """
697         command = self.install_packages_command(packages)
698
699         run_home = run_home or home
700
701         (out, err), proc = self.run_and_wait(command, run_home, 
702             shfile = os.path.join(home, "instpkg.sh"),
703             pidfile = "instpkg_pidfile",
704             ecodefile = "instpkg_exitcode",
705             stdout = "instpkg_stdout", 
706             stderr = "instpkg_stderr",
707             overwrite = False,
708             raise_on_error = True)
709
710         return (out, err), proc 
711
712     def remove_packages(self, packages, home, run_home = None):
713         """ Uninstall packages from the Linux host.
714
715         'home' is the directory to upload the package un-installation script.
716         'run_home' is the directory from where to execute the script.
717         """
718         if self.use_rpm:
719             command = rpmfuncs.remove_packages_command(self.os, packages)
720         elif self.use_deb:
721             command = debfuncs.remove_packages_command(self.os, packages)
722         else:
723             msg = "Error removing packages ( OS not known ) "
724             self.error(msg)
725             raise RuntimeError, msg
726
727         run_home = run_home or home
728
729         (out, err), proc = self.run_and_wait(command, run_home, 
730             shfile = os.path.join(home, "rmpkg.sh"),
731             pidfile = "rmpkg_pidfile",
732             ecodefile = "rmpkg_exitcode",
733             stdout = "rmpkg_stdout", 
734             stderr = "rmpkg_stderr",
735             overwrite = False,
736             raise_on_error = True)
737          
738         return (out, err), proc 
739
740     def mkdir(self, path, clean = False):
741         if clean:
742             self.rmdir(path)
743
744         return self.execute("mkdir -p %s" % path, with_lock = True)
745
746     def rmdir(self, path):
747         return self.execute("rm -rf %s" % path, with_lock = True)
748         
749     def run_and_wait(self, command, home, 
750             shfile = "cmd.sh",
751             env = None,
752             overwrite = True,
753             pidfile = "pidfile", 
754             ecodefile = "exitcode", 
755             stdin = None, 
756             stdout = "stdout", 
757             stderr = "stderr", 
758             sudo = False,
759             tty = False,
760             raise_on_error = False):
761         """
762         Uploads the 'command' to a bash script in the host.
763         Then runs the script detached in background in the host, and
764         busy-waites until the script finishes executing.
765         """
766
767         if not shfile.startswith("/"):
768             shfile = os.path.join(home, shfile)
769
770         self.upload_command(command, 
771             shfile = shfile, 
772             ecodefile = ecodefile, 
773             env = env,
774             overwrite = overwrite)
775
776         command = "bash %s" % shfile
777         # run command in background in remote host
778         (out, err), proc = self.run(command, home, 
779                 pidfile = pidfile,
780                 stdin = stdin, 
781                 stdout = stdout, 
782                 stderr = stderr, 
783                 sudo = sudo,
784                 tty = tty)
785
786         # check no errors occurred
787         if proc.poll():
788             msg = " Failed to run command '%s' " % command
789             self.error(msg, out, err)
790             if raise_on_error:
791                 raise RuntimeError, msg
792
793         # Wait for pid file to be generated
794         pid, ppid = self.wait_pid(
795                 home = home, 
796                 pidfile = pidfile, 
797                 raise_on_error = raise_on_error)
798
799         # wait until command finishes to execute
800         self.wait_run(pid, ppid)
801       
802         (eout, err), proc = self.check_errors(home,
803             ecodefile = ecodefile,
804             stderr = stderr)
805
806         # Out is what was written in the stderr file
807         if err:
808             msg = " Failed to run command '%s' " % command
809             self.error(msg, eout, err)
810
811             if raise_on_error:
812                 raise RuntimeError, msg
813
814         (out, oerr), proc = self.check_output(home, stdout)
815         
816         return (out, err), proc
817
818     def exitcode(self, home, ecodefile = "exitcode"):
819         """
820         Get the exit code of an application.
821         Returns an integer value with the exit code 
822         """
823         (out, err), proc = self.check_output(home, ecodefile)
824
825         # Succeeded to open file, return exit code in the file
826         if proc.wait() == 0:
827             try:
828                 return int(out.strip())
829             except:
830                 # Error in the content of the file!
831                 return ExitCode.CORRUPTFILE
832
833         # No such file or directory
834         if proc.returncode == 1:
835             return ExitCode.FILENOTFOUND
836         
837         # Other error from 'cat'
838         return ExitCode.ERROR
839
840     def upload_command(self, command, 
841             shfile = "cmd.sh",
842             ecodefile = "exitcode",
843             overwrite = True,
844             env = None):
845         """ Saves the command as a bash script file in the remote host, and
846         forces to save the exit code of the command execution to the ecodefile
847         """
848
849         if not (command.strip().endswith(";") or command.strip().endswith("&")):
850             command += ";"
851       
852         # The exit code of the command will be stored in ecodefile
853         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
854                 'command': command,
855                 'ecodefile': ecodefile,
856                 } 
857
858         # Export environment
859         environ = self.format_environment(env)
860
861         # Add environ to command
862         command = environ + command
863
864         return self.upload(command, shfile, text = True, overwrite = overwrite)
865
866     def format_environment(self, env, inline = False):
867         """ Formats the environment variables for a command to be executed
868         either as an inline command
869         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
870         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
871         """
872         if not env: return ""
873
874         # Remove extra white spaces
875         env = re.sub(r'\s+', ' ', env.strip())
876
877         sep = ";" if inline else "\n"
878         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
879
880     def check_errors(self, home, 
881             ecodefile = "exitcode", 
882             stderr = "stderr"):
883         """ Checks whether errors occurred while running a command.
884         It first checks the exit code for the command, and only if the
885         exit code is an error one it returns the error output.
886
887         """
888         proc = None
889         err = ""
890
891         # get exit code saved in the 'exitcode' file
892         ecode = self.exitcode(home, ecodefile)
893
894         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
895             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
896         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
897             # The process returned an error code or didn't exist. 
898             # Check standard error.
899             (err, eerr), proc = self.check_output(home, stderr)
900
901             # If the stderr file was not found, assume nothing bad happened,
902             # and just ignore the error.
903             # (cat returns 1 for error "No such file or directory")
904             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
905                 err = "" 
906             
907         return ("", err), proc
908  
909     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
910         """ Waits until the pid file for the command is generated, 
911             and returns the pid and ppid of the process """
912         pid = ppid = None
913         delay = 1.0
914
915         for i in xrange(2):
916             pidtuple = self.getpid(home = home, pidfile = pidfile)
917             
918             if pidtuple:
919                 pid, ppid = pidtuple
920                 break
921             else:
922                 time.sleep(delay)
923                 delay = delay * 1.5
924         else:
925             msg = " Failed to get pid for pidfile %s/%s " % (
926                     home, pidfile )
927             self.error(msg)
928             
929             if raise_on_error:
930                 raise RuntimeError, msg
931
932         return pid, ppid
933
934     def wait_run(self, pid, ppid, trial = 0):
935         """ wait for a remote process to finish execution """
936         delay = 1.0
937
938         while True:
939             status = self.status(pid, ppid)
940             
941             if status is ProcStatus.FINISHED:
942                 break
943             elif status is not ProcStatus.RUNNING:
944                 delay = delay * 1.5
945                 time.sleep(delay)
946                 # If it takes more than 20 seconds to start, then
947                 # asume something went wrong
948                 if delay > 20:
949                     break
950             else:
951                 # The app is running, just wait...
952                 time.sleep(0.5)
953
954     def check_output(self, home, filename):
955         """ Retrives content of file """
956         (out, err), proc = self.execute("cat %s" % 
957             os.path.join(home, filename), retry = 1, with_lock = True)
958         return (out, err), proc
959
960     def is_alive(self):
961         """ Checks if host is responsive
962         """
963         if self.localhost:
964             return True
965
966         out = err = ""
967         # The underlying SSH layer will sometimes return an empty
968         # output (even if the command was executed without errors).
969         # To work arround this, repeat the operation N times or
970         # until the result is not empty string
971         retrydelay = 1.0
972         for i in xrange(2):
973             try:
974                 (out, err), proc = self.execute("echo 'ALIVE'",
975                         retry = 5,
976                         blocking = True,
977                         with_lock = True)
978         
979                 if out.find("ALIVE") > -1:
980                     return True
981             except:
982                 trace = traceback.format_exc()
983                 msg = "Unresponsive host. Error reaching host: %s " % trace
984                 self.error(msg, out, err)
985                 return False
986
987             time.sleep(min(30.0, retrydelay))
988             retrydelay *= 1.5
989
990         if out.find("ALIVE") > -1:
991             return True
992         else:
993             msg = "Unresponsive host. Wrong answer. "
994             self.error(msg, out, err)
995             return False
996
997     def find_home(self):
998         """ Retrieves host home directory
999         """
1000         # The underlying SSH layer will sometimes return an empty
1001         # output (even if the command was executed without errors).
1002         # To work arround this, repeat the operation N times or
1003         # until the result is not empty string
1004         retrydelay = 1.0
1005         for i in xrange(2):
1006             try:
1007                 (out, err), proc = self.execute("echo ${HOME}",
1008                         retry = 5,
1009                         blocking = True,
1010                         with_lock = True)
1011         
1012                 if out.strip() != "":
1013                     self._home_dir =  out.strip()
1014                     break
1015             except:
1016                 trace = traceback.format_exc()
1017                 msg = "Impossible to retrieve HOME directory" % trace
1018                 self.error(msg, out, err)
1019                 return False
1020
1021             time.sleep(min(30.0, retrydelay))
1022             retrydelay *= 1.5
1023
1024         if not self._home_dir:
1025             msg = "Impossible to retrieve HOME directory"
1026             self.error(msg, out, err)
1027             raise RuntimeError, msg
1028
1029     def filter_existing_files(self, src, dst):
1030         """ Removes files that already exist in the Linux host from src list
1031         """
1032         # construct a dictionary with { dst: src }
1033         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
1034             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1035
1036         command = []
1037         for d in dests.keys():
1038             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1039
1040         command = ";".join(command)
1041
1042         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1043     
1044         for d in dests.keys():
1045             if out.find(d) > -1:
1046                 del dests[d]
1047
1048         if not dests:
1049             return ""
1050
1051         return " ".join(dests.values())
1052