merged ex_shutdown into nepi-3-dev
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay, failtrap
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.ExecReadOnly)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.ExecReadOnly)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.ExecReadOnly)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.ExecReadOnly)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.ExecReadOnly)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.ExecReadOnly)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.ExecReadOnly)
190
191         cls._register_attribute(hostname)
192         cls._register_attribute(username)
193         cls._register_attribute(port)
194         cls._register_attribute(home)
195         cls._register_attribute(identity)
196         cls._register_attribute(server_key)
197         cls._register_attribute(clean_home)
198         cls._register_attribute(clean_experiment)
199         cls._register_attribute(clean_processes)
200         cls._register_attribute(tear_down)
201
202     def __init__(self, ec, guid):
203         super(LinuxNode, self).__init__(ec, guid)
204         self._os = None
205         # home directory at Linux host
206         self._home_dir = ""
207         
208         # lock to prevent concurrent applications on the same node,
209         # to execute commands at the same time. There are potential
210         # concurrency issues when using SSH to a same host from 
211         # multiple threads. There are also possible operational 
212         # issues, e.g. an application querying the existence 
213         # of a file or folder prior to its creation, and another 
214         # application creating the same file or folder in between.
215         self._node_lock = threading.Lock()
216     
217     def log_message(self, msg):
218         return " guid %d - host %s - %s " % (self.guid, 
219                 self.get("hostname"), msg)
220
221     @property
222     def home_dir(self):
223         home = self.get("home") or ""
224         if not home.startswith("/"):
225            home = os.path.join(self._home_dir, home) 
226         return home
227
228     @property
229     def usr_dir(self):
230         return os.path.join(self.home_dir, "nepi-usr")
231
232     @property
233     def lib_dir(self):
234         return os.path.join(self.usr_dir, "lib")
235
236     @property
237     def bin_dir(self):
238         return os.path.join(self.usr_dir, "bin")
239
240     @property
241     def src_dir(self):
242         return os.path.join(self.usr_dir, "src")
243
244     @property
245     def share_dir(self):
246         return os.path.join(self.usr_dir, "share")
247
248     @property
249     def exp_dir(self):
250         return os.path.join(self.home_dir, "nepi-exp")
251
252     @property
253     def exp_home(self):
254         return os.path.join(self.exp_dir, self.ec.exp_id)
255
256     @property
257     def node_home(self):
258         return os.path.join(self.exp_home, "node-%d" % self.guid)
259
260     @property
261     def run_home(self):
262         return os.path.join(self.node_home, self.ec.run_id)
263
264     @property
265     def os(self):
266         if self._os:
267             return self._os
268
269         if (not self.get("hostname") or not self.get("username")):
270             msg = "Can't resolve OS, insufficient data "
271             self.error(msg)
272             raise RuntimeError, msg
273
274         out = self.get_os()
275
276         if out.find("Fedora release 8") == 0:
277             self._os = OSType.FEDORA_8
278         elif out.find("Fedora release 12") == 0:
279             self._os = OSType.FEDORA_12
280         elif out.find("Fedora release 14") == 0:
281             self._os = OSType.FEDORA_14
282         elif out.find("Fedora release") == 0:
283             self._os = OSType.FEDORA
284         elif out.find("Debian") == 0: 
285             self._os = OSType.DEBIAN
286         elif out.find("Ubuntu") ==0:
287             self._os = OSType.UBUNTU
288         else:
289             msg = "Unsupported OS"
290             self.error(msg, out)
291             raise RuntimeError, "%s - %s " %( msg, out )
292
293         return self._os
294
295     def get_os(self):
296         # The underlying SSH layer will sometimes return an empty
297         # output (even if the command was executed without errors).
298         # To work arround this, repeat the operation N times or
299         # until the result is not empty string
300         out = ""
301         retrydelay = 1.0
302         for i in xrange(2):
303             try:
304                 (out, err), proc = self.execute("cat /etc/issue", 
305                         retry = 5,
306                         with_lock = True,
307                         blocking = True)
308
309                 if out.strip() != "":
310                     return out
311             except:
312                 trace = traceback.format_exc()
313                 msg = "Error detecting OS: %s " % trace
314                 self.error(msg, out, err)
315                 return False
316
317             time.sleep(min(30.0, retrydelay))
318             retrydelay *= 1.5
319
320     @property
321     def use_deb(self):
322         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
323
324     @property
325     def use_rpm(self):
326         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
327                 OSType.FEDORA]
328
329     @property
330     def localhost(self):
331         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
332
333     @failtrap
334     def provision(self):
335         # check if host is alive
336         if not self.is_alive():
337             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
338             self.error(msg)
339             raise RuntimeError, msg
340
341         self.find_home()
342
343         if self.get("cleanProcesses"):
344             self.clean_processes()
345
346         if self.get("cleanHome"):
347             self.clean_home()
348  
349         if self.get("cleanExperiment"):
350             self.clean_experiment()
351     
352         # Create shared directory structure
353         self.mkdir(self.lib_dir)
354         self.mkdir(self.bin_dir)
355         self.mkdir(self.src_dir)
356         self.mkdir(self.share_dir)
357
358         # Create experiment node home directory
359         self.mkdir(self.node_home)
360
361         super(LinuxNode, self).provision()
362
363     @failtrap
364     def deploy(self):
365         if self.state == ResourceState.NEW:
366             self.info("Deploying node")
367             self.discover()
368             self.provision()
369
370         # Node needs to wait until all associated interfaces are 
371         # ready before it can finalize deployment
372         from nepi.resources.linux.interface import LinuxInterface
373         ifaces = self.get_connected(LinuxInterface.rtype())
374         for iface in ifaces:
375             if iface.state < ResourceState.READY:
376                 self.ec.schedule(reschedule_delay, self.deploy)
377                 return 
378
379         super(LinuxNode, self).deploy()
380
381     def release(self):
382         try:
383             rms = self.get_connected()
384             for rm in rms:
385                 # Node needs to wait until all associated RMs are released
386                 # before it can be released
387                 if rm.state < ResourceState.STOPPED:
388                     self.ec.schedule(reschedule_delay, self.release)
389                     return 
390
391             tear_down = self.get("tearDown")
392             if tear_down:
393                 self.execute(tear_down)
394
395             self.clean_processes()
396         except:
397             import traceback
398             err = traceback.format_exc()
399             self.error(err)
400
401         super(LinuxNode, self).release()
402
403     def valid_connection(self, guid):
404         # TODO: Validate!
405         return True
406
407     def clean_processes(self, killer = False):
408         self.info("Cleaning up processes")
409         
410         if killer:
411             # Hardcore kill
412             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
413                 "sudo -S killall python tcpdump || /bin/true ; " +
414                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
415                 "sudo -S killall -u root || /bin/true ; " +
416                 "sudo -S killall -u root || /bin/true ; ")
417         else:
418             # Be gentler...
419             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
420                 "sudo -S killall tcpdump || /bin/true ; " +
421                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
422                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
423
424         out = err = ""
425         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
426             
427     def clean_home(self):
428         """ Cleans all NEPI related folders in the Linux host
429         """
430         self.info("Cleaning up home")
431         
432         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
433                 self.home_dir )
434
435         return self.execute(cmd, with_lock = True)
436
437     def clean_experiment(self):
438         """ Cleans all experiment related files in the Linux host.
439         It preserves NEPI files and folders that have a multi experiment
440         scope.
441         """
442         self.info("Cleaning up experiment files")
443         
444         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
445                 self.exp_dir,
446                 self.ec.exp_id )
447             
448         return self.execute(cmd, with_lock = True)
449
450     def execute(self, command,
451             sudo = False,
452             stdin = None, 
453             env = None,
454             tty = False,
455             forward_x11 = False,
456             timeout = None,
457             retry = 3,
458             err_on_timeout = True,
459             connect_timeout = 30,
460             strict_host_checking = False,
461             persistent = True,
462             blocking = True,
463             with_lock = False
464             ):
465         """ Notice that this invocation will block until the
466         execution finishes. If this is not the desired behavior,
467         use 'run' instead."""
468
469         if self.localhost:
470             (out, err), proc = execfuncs.lexec(command, 
471                     user = user,
472                     sudo = sudo,
473                     stdin = stdin,
474                     env = env)
475         else:
476             if with_lock:
477                 with self._node_lock:
478                     (out, err), proc = sshfuncs.rexec(
479                         command, 
480                         host = self.get("hostname"),
481                         user = self.get("username"),
482                         port = self.get("port"),
483                         agent = True,
484                         sudo = sudo,
485                         stdin = stdin,
486                         identity = self.get("identity"),
487                         server_key = self.get("serverKey"),
488                         env = env,
489                         tty = tty,
490                         forward_x11 = forward_x11,
491                         timeout = timeout,
492                         retry = retry,
493                         err_on_timeout = err_on_timeout,
494                         connect_timeout = connect_timeout,
495                         persistent = persistent,
496                         blocking = blocking, 
497                         strict_host_checking = strict_host_checking
498                         )
499             else:
500                 (out, err), proc = sshfuncs.rexec(
501                     command, 
502                     host = self.get("hostname"),
503                     user = self.get("username"),
504                     port = self.get("port"),
505                     agent = True,
506                     sudo = sudo,
507                     stdin = stdin,
508                     identity = self.get("identity"),
509                     server_key = self.get("serverKey"),
510                     env = env,
511                     tty = tty,
512                     forward_x11 = forward_x11,
513                     timeout = timeout,
514                     retry = retry,
515                     err_on_timeout = err_on_timeout,
516                     connect_timeout = connect_timeout,
517                     persistent = persistent,
518                     blocking = blocking, 
519                     strict_host_checking = strict_host_checking
520                     )
521
522         return (out, err), proc
523
524     def run(self, command, home,
525             create_home = False,
526             pidfile = 'pidfile',
527             stdin = None, 
528             stdout = 'stdout', 
529             stderr = 'stderr', 
530             sudo = False,
531             tty = False):
532         
533         self.debug("Running command '%s'" % command)
534         
535         if self.localhost:
536             (out, err), proc = execfuncs.lspawn(command, pidfile, 
537                     stdout = stdout, 
538                     stderr = stderr, 
539                     stdin = stdin, 
540                     home = home, 
541                     create_home = create_home, 
542                     sudo = sudo,
543                     user = user) 
544         else:
545             with self._node_lock:
546                 (out, err), proc = sshfuncs.rspawn(
547                     command,
548                     pidfile = pidfile,
549                     home = home,
550                     create_home = create_home,
551                     stdin = stdin if stdin is not None else '/dev/null',
552                     stdout = stdout if stdout else '/dev/null',
553                     stderr = stderr if stderr else '/dev/null',
554                     sudo = sudo,
555                     host = self.get("hostname"),
556                     user = self.get("username"),
557                     port = self.get("port"),
558                     agent = True,
559                     identity = self.get("identity"),
560                     server_key = self.get("serverKey"),
561                     tty = tty
562                     )
563
564         return (out, err), proc
565
566     def getpid(self, home, pidfile = "pidfile"):
567         if self.localhost:
568             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
569         else:
570             with self._node_lock:
571                 pidtuple = sshfuncs.rgetpid(
572                     os.path.join(home, pidfile),
573                     host = self.get("hostname"),
574                     user = self.get("username"),
575                     port = self.get("port"),
576                     agent = True,
577                     identity = self.get("identity"),
578                     server_key = self.get("serverKey")
579                     )
580         
581         return pidtuple
582
583     def status(self, pid, ppid):
584         if self.localhost:
585             status = execfuncs.lstatus(pid, ppid)
586         else:
587             with self._node_lock:
588                 status = sshfuncs.rstatus(
589                         pid, ppid,
590                         host = self.get("hostname"),
591                         user = self.get("username"),
592                         port = self.get("port"),
593                         agent = True,
594                         identity = self.get("identity"),
595                         server_key = self.get("serverKey")
596                         )
597            
598         return status
599     
600     def kill(self, pid, ppid, sudo = False):
601         out = err = ""
602         proc = None
603         status = self.status(pid, ppid)
604
605         if status == sshfuncs.ProcStatus.RUNNING:
606             if self.localhost:
607                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
608             else:
609                 with self._node_lock:
610                     (out, err), proc = sshfuncs.rkill(
611                         pid, ppid,
612                         host = self.get("hostname"),
613                         user = self.get("username"),
614                         port = self.get("port"),
615                         agent = True,
616                         sudo = sudo,
617                         identity = self.get("identity"),
618                         server_key = self.get("serverKey")
619                         )
620
621         return (out, err), proc
622
623     def copy(self, src, dst):
624         if self.localhost:
625             (out, err), proc = execfuncs.lcopy(source, dest, 
626                     recursive = True,
627                     strict_host_checking = False)
628         else:
629             with self._node_lock:
630                 (out, err), proc = sshfuncs.rcopy(
631                     src, dst, 
632                     port = self.get("port"),
633                     identity = self.get("identity"),
634                     server_key = self.get("serverKey"),
635                     recursive = True,
636                     strict_host_checking = False)
637
638         return (out, err), proc
639
640     def upload(self, src, dst, text = False, overwrite = True):
641         """ Copy content to destination
642
643            src  content to copy. Can be a local file, directory or a list of files
644
645            dst  destination path on the remote host (remote is always self.host)
646
647            text src is text input, it must be stored into a temp file before uploading
648         """
649         # If source is a string input 
650         f = None
651         if text and not os.path.isfile(src):
652             # src is text input that should be uploaded as file
653             # create a temporal file with the content to upload
654             f = tempfile.NamedTemporaryFile(delete=False)
655             f.write(src)
656             f.close()
657             src = f.name
658
659         # If dst files should not be overwritten, check that the files do not
660         # exits already 
661         if overwrite == False:
662             src = self.filter_existing_files(src, dst)
663             if not src:
664                 return ("", ""), None 
665
666         if not self.localhost:
667             # Build destination as <user>@<server>:<path>
668             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
669
670         result = self.copy(src, dst)
671
672         # clean up temp file
673         if f:
674             os.remove(f.name)
675
676         return result
677
678     def download(self, src, dst):
679         if not self.localhost:
680             # Build destination as <user>@<server>:<path>
681             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
682         return self.copy(src, dst)
683
684     def install_packages_command(self, packages):
685         command = ""
686         if self.use_rpm:
687             command = rpmfuncs.install_packages_command(self.os, packages)
688         elif self.use_deb:
689             command = debfuncs.install_packages_command(self.os, packages)
690         else:
691             msg = "Error installing packages ( OS not known ) "
692             self.error(msg, self.os)
693             raise RuntimeError, msg
694
695         return command
696
697     def install_packages(self, packages, home, run_home = None):
698         """ Install packages in the Linux host.
699
700         'home' is the directory to upload the package installation script.
701         'run_home' is the directory from where to execute the script.
702         """
703         command = self.install_packages_command(packages)
704
705         run_home = run_home or home
706
707         (out, err), proc = self.run_and_wait(command, run_home, 
708             shfile = os.path.join(home, "instpkg.sh"),
709             pidfile = "instpkg_pidfile",
710             ecodefile = "instpkg_exitcode",
711             stdout = "instpkg_stdout", 
712             stderr = "instpkg_stderr",
713             overwrite = False,
714             raise_on_error = True)
715
716         return (out, err), proc 
717
718     def remove_packages(self, packages, home, run_home = None):
719         """ Uninstall packages from the Linux host.
720
721         'home' is the directory to upload the package un-installation script.
722         'run_home' is the directory from where to execute the script.
723         """
724         if self.use_rpm:
725             command = rpmfuncs.remove_packages_command(self.os, packages)
726         elif self.use_deb:
727             command = debfuncs.remove_packages_command(self.os, packages)
728         else:
729             msg = "Error removing packages ( OS not known ) "
730             self.error(msg)
731             raise RuntimeError, msg
732
733         run_home = run_home or home
734
735         (out, err), proc = self.run_and_wait(command, run_home, 
736             shfile = os.path.join(home, "rmpkg.sh"),
737             pidfile = "rmpkg_pidfile",
738             ecodefile = "rmpkg_exitcode",
739             stdout = "rmpkg_stdout", 
740             stderr = "rmpkg_stderr",
741             overwrite = False,
742             raise_on_error = True)
743          
744         return (out, err), proc 
745
746     def mkdir(self, path, clean = False):
747         if clean:
748             self.rmdir(path)
749
750         return self.execute("mkdir -p %s" % path, with_lock = True)
751
752     def rmdir(self, path):
753         return self.execute("rm -rf %s" % path, with_lock = True)
754         
755     def run_and_wait(self, command, home, 
756             shfile = "cmd.sh",
757             env = None,
758             overwrite = True,
759             pidfile = "pidfile", 
760             ecodefile = "exitcode", 
761             stdin = None, 
762             stdout = "stdout", 
763             stderr = "stderr", 
764             sudo = False,
765             tty = False,
766             raise_on_error = False):
767         """
768         Uploads the 'command' to a bash script in the host.
769         Then runs the script detached in background in the host, and
770         busy-waites until the script finishes executing.
771         """
772
773         if not shfile.startswith("/"):
774             shfile = os.path.join(home, shfile)
775
776         self.upload_command(command, 
777             shfile = shfile, 
778             ecodefile = ecodefile, 
779             env = env,
780             overwrite = overwrite)
781
782         command = "bash %s" % shfile
783         # run command in background in remote host
784         (out, err), proc = self.run(command, home, 
785                 pidfile = pidfile,
786                 stdin = stdin, 
787                 stdout = stdout, 
788                 stderr = stderr, 
789                 sudo = sudo,
790                 tty = tty)
791
792         # check no errors occurred
793         if proc.poll():
794             msg = " Failed to run command '%s' " % command
795             self.error(msg, out, err)
796             if raise_on_error:
797                 raise RuntimeError, msg
798
799         # Wait for pid file to be generated
800         pid, ppid = self.wait_pid(
801                 home = home, 
802                 pidfile = pidfile, 
803                 raise_on_error = raise_on_error)
804
805         # wait until command finishes to execute
806         self.wait_run(pid, ppid)
807       
808         (eout, err), proc = self.check_errors(home,
809             ecodefile = ecodefile,
810             stderr = stderr)
811
812         # Out is what was written in the stderr file
813         if err:
814             msg = " Failed to run command '%s' " % command
815             self.error(msg, eout, err)
816
817             if raise_on_error:
818                 raise RuntimeError, msg
819
820         (out, oerr), proc = self.check_output(home, stdout)
821         
822         return (out, err), proc
823
824     def exitcode(self, home, ecodefile = "exitcode"):
825         """
826         Get the exit code of an application.
827         Returns an integer value with the exit code 
828         """
829         (out, err), proc = self.check_output(home, ecodefile)
830
831         # Succeeded to open file, return exit code in the file
832         if proc.wait() == 0:
833             try:
834                 return int(out.strip())
835             except:
836                 # Error in the content of the file!
837                 return ExitCode.CORRUPTFILE
838
839         # No such file or directory
840         if proc.returncode == 1:
841             return ExitCode.FILENOTFOUND
842         
843         # Other error from 'cat'
844         return ExitCode.ERROR
845
846     def upload_command(self, command, 
847             shfile = "cmd.sh",
848             ecodefile = "exitcode",
849             overwrite = True,
850             env = None):
851         """ Saves the command as a bash script file in the remote host, and
852         forces to save the exit code of the command execution to the ecodefile
853         """
854
855         if not (command.strip().endswith(";") or command.strip().endswith("&")):
856             command += ";"
857       
858         # The exit code of the command will be stored in ecodefile
859         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
860                 'command': command,
861                 'ecodefile': ecodefile,
862                 } 
863
864         # Export environment
865         environ = self.format_environment(env)
866
867         # Add environ to command
868         command = environ + command
869
870         return self.upload(command, shfile, text = True, overwrite = overwrite)
871
872     def format_environment(self, env, inline = False):
873         """ Formats the environment variables for a command to be executed
874         either as an inline command
875         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
876         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
877         """
878         if not env: return ""
879
880         # Remove extra white spaces
881         env = re.sub(r'\s+', ' ', env.strip())
882
883         sep = ";" if inline else "\n"
884         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
885
886     def check_errors(self, home, 
887             ecodefile = "exitcode", 
888             stderr = "stderr"):
889         """ Checks whether errors occurred while running a command.
890         It first checks the exit code for the command, and only if the
891         exit code is an error one it returns the error output.
892
893         """
894         proc = None
895         err = ""
896
897         # get exit code saved in the 'exitcode' file
898         ecode = self.exitcode(home, ecodefile)
899
900         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
901             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
902         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
903             # The process returned an error code or didn't exist. 
904             # Check standard error.
905             (err, eerr), proc = self.check_output(home, stderr)
906
907             # If the stderr file was not found, assume nothing bad happened,
908             # and just ignore the error.
909             # (cat returns 1 for error "No such file or directory")
910             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
911                 err = "" 
912             
913         return ("", err), proc
914  
915     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
916         """ Waits until the pid file for the command is generated, 
917             and returns the pid and ppid of the process """
918         pid = ppid = None
919         delay = 1.0
920
921         for i in xrange(2):
922             pidtuple = self.getpid(home = home, pidfile = pidfile)
923             
924             if pidtuple:
925                 pid, ppid = pidtuple
926                 break
927             else:
928                 time.sleep(delay)
929                 delay = delay * 1.5
930         else:
931             msg = " Failed to get pid for pidfile %s/%s " % (
932                     home, pidfile )
933             self.error(msg)
934             
935             if raise_on_error:
936                 raise RuntimeError, msg
937
938         return pid, ppid
939
940     def wait_run(self, pid, ppid, trial = 0):
941         """ wait for a remote process to finish execution """
942         delay = 1.0
943
944         while True:
945             status = self.status(pid, ppid)
946             
947             if status is ProcStatus.FINISHED:
948                 break
949             elif status is not ProcStatus.RUNNING:
950                 delay = delay * 1.5
951                 time.sleep(delay)
952                 # If it takes more than 20 seconds to start, then
953                 # asume something went wrong
954                 if delay > 20:
955                     break
956             else:
957                 # The app is running, just wait...
958                 time.sleep(0.5)
959
960     def check_output(self, home, filename):
961         """ Retrives content of file """
962         (out, err), proc = self.execute("cat %s" % 
963             os.path.join(home, filename), retry = 1, with_lock = True)
964         return (out, err), proc
965
966     def is_alive(self):
967         """ Checks if host is responsive
968         """
969         if self.localhost:
970             return True
971
972         out = err = ""
973         # The underlying SSH layer will sometimes return an empty
974         # output (even if the command was executed without errors).
975         # To work arround this, repeat the operation N times or
976         # until the result is not empty string
977         retrydelay = 1.0
978         for i in xrange(2):
979             try:
980                 (out, err), proc = self.execute("echo 'ALIVE'",
981                         retry = 5,
982                         blocking = True,
983                         with_lock = True)
984         
985                 if out.find("ALIVE") > -1:
986                     return True
987             except:
988                 trace = traceback.format_exc()
989                 msg = "Unresponsive host. Error reaching host: %s " % trace
990                 self.error(msg, out, err)
991                 return False
992
993             time.sleep(min(30.0, retrydelay))
994             retrydelay *= 1.5
995
996         if out.find("ALIVE") > -1:
997             return True
998         else:
999             msg = "Unresponsive host. Wrong answer. "
1000             self.error(msg, out, err)
1001             return False
1002
1003     def find_home(self):
1004         """ Retrieves host home directory
1005         """
1006         # The underlying SSH layer will sometimes return an empty
1007         # output (even if the command was executed without errors).
1008         # To work arround this, repeat the operation N times or
1009         # until the result is not empty string
1010         retrydelay = 1.0
1011         for i in xrange(2):
1012             try:
1013                 (out, err), proc = self.execute("echo ${HOME}",
1014                         retry = 5,
1015                         blocking = True,
1016                         with_lock = True)
1017         
1018                 if out.strip() != "":
1019                     self._home_dir =  out.strip()
1020                     break
1021             except:
1022                 trace = traceback.format_exc()
1023                 msg = "Impossible to retrieve HOME directory" % trace
1024                 self.error(msg, out, err)
1025                 return False
1026
1027             time.sleep(min(30.0, retrydelay))
1028             retrydelay *= 1.5
1029
1030         if not self._home_dir:
1031             msg = "Impossible to retrieve HOME directory"
1032             self.error(msg, out, err)
1033             raise RuntimeError, msg
1034
1035     def filter_existing_files(self, src, dst):
1036         """ Removes files that already exist in the Linux host from src list
1037         """
1038         # construct a dictionary with { dst: src }
1039         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
1040             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1041
1042         command = []
1043         for d in dests.keys():
1044             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1045
1046         command = ";".join(command)
1047
1048         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1049     
1050         for d in dests.keys():
1051             if out.find(d) > -1:
1052                 del dests[d]
1053
1054         if not dests:
1055             return ""
1056
1057         return " ".join(dests.values())
1058