Trying to make LinuxNS3Simulator to deploy remotely ....
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.Design)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.Design)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.Design)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.Design)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.Design)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.Design)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.Design)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.Design)
190
191         cls._register_attribute(hostname)
192         cls._register_attribute(username)
193         cls._register_attribute(port)
194         cls._register_attribute(home)
195         cls._register_attribute(identity)
196         cls._register_attribute(server_key)
197         cls._register_attribute(clean_home)
198         cls._register_attribute(clean_experiment)
199         cls._register_attribute(clean_processes)
200         cls._register_attribute(tear_down)
201
202     def __init__(self, ec, guid):
203         super(LinuxNode, self).__init__(ec, guid)
204         self._os = None
205         # home directory at Linux host
206         self._home_dir = ""
207         
208         # lock to prevent concurrent applications on the same node,
209         # to execute commands at the same time. There are potential
210         # concurrency issues when using SSH to a same host from 
211         # multiple threads. There are also possible operational 
212         # issues, e.g. an application querying the existence 
213         # of a file or folder prior to its creation, and another 
214         # application creating the same file or folder in between.
215         self._node_lock = threading.Lock()
216     
217     def log_message(self, msg):
218         return " guid %d - host %s - %s " % (self.guid, 
219                 self.get("hostname"), msg)
220
221     @property
222     def home_dir(self):
223         home = self.get("home") or ""
224         if not home.startswith("/"):
225            home = os.path.join(self._home_dir, home) 
226         return home
227
228     @property
229     def usr_dir(self):
230         return os.path.join(self.home_dir, "nepi-usr")
231
232     @property
233     def lib_dir(self):
234         return os.path.join(self.usr_dir, "lib")
235
236     @property
237     def bin_dir(self):
238         return os.path.join(self.usr_dir, "bin")
239
240     @property
241     def src_dir(self):
242         return os.path.join(self.usr_dir, "src")
243
244     @property
245     def share_dir(self):
246         return os.path.join(self.usr_dir, "share")
247
248     @property
249     def exp_dir(self):
250         return os.path.join(self.home_dir, "nepi-exp")
251
252     @property
253     def exp_home(self):
254         return os.path.join(self.exp_dir, self.ec.exp_id)
255
256     @property
257     def node_home(self):
258         return os.path.join(self.exp_home, "node-%d" % self.guid)
259
260     @property
261     def run_home(self):
262         return os.path.join(self.node_home, self.ec.run_id)
263
264     @property
265     def os(self):
266         if self._os:
267             return self._os
268
269         if (not self.get("hostname") or not self.get("username")):
270             msg = "Can't resolve OS, insufficient data "
271             self.error(msg)
272             raise RuntimeError, msg
273
274         out = self.get_os()
275
276         if out.find("Fedora release 8") == 0:
277             self._os = OSType.FEDORA_8
278         elif out.find("Fedora release 12") == 0:
279             self._os = OSType.FEDORA_12
280         elif out.find("Fedora release 14") == 0:
281             self._os = OSType.FEDORA_14
282         elif out.find("Fedora release") == 0:
283             self._os = OSType.FEDORA
284         elif out.find("Debian") == 0: 
285             self._os = OSType.DEBIAN
286         elif out.find("Ubuntu") ==0:
287             self._os = OSType.UBUNTU
288         else:
289             msg = "Unsupported OS"
290             self.error(msg, out)
291             raise RuntimeError, "%s - %s " %( msg, out )
292
293         return self._os
294
295     def get_os(self):
296         # The underlying SSH layer will sometimes return an empty
297         # output (even if the command was executed without errors).
298         # To work arround this, repeat the operation N times or
299         # until the result is not empty string
300         out = ""
301         try:
302             (out, err), proc = self.execute("cat /etc/issue", 
303                     with_lock = True,
304                     blocking = True)
305         except:
306             trace = traceback.format_exc()
307             msg = "Error detecting OS: %s " % trace
308             self.error(msg, out, err)
309         
310         return out
311
312     @property
313     def use_deb(self):
314         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
315
316     @property
317     def use_rpm(self):
318         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
319                 OSType.FEDORA]
320
321     @property
322     def localhost(self):
323         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
324
325     def do_provision(self):
326         # check if host is alive
327         if not self.is_alive():
328             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
329             self.error(msg)
330             raise RuntimeError, msg
331
332         self.find_home()
333
334         if self.get("cleanProcesses"):
335             self.clean_processes()
336
337         if self.get("cleanHome"):
338             self.clean_home()
339  
340         if self.get("cleanExperiment"):
341             self.clean_experiment()
342     
343         # Create shared directory structure
344         self.mkdir(self.lib_dir)
345         self.mkdir(self.bin_dir)
346         self.mkdir(self.src_dir)
347         self.mkdir(self.share_dir)
348
349         # Create experiment node home directory
350         self.mkdir(self.node_home)
351
352         super(LinuxNode, self).do_provision()
353
354     def do_deploy(self):
355         if self.state == ResourceState.NEW:
356             self.info("Deploying node")
357             self.do_discover()
358             self.do_provision()
359
360         # Node needs to wait until all associated interfaces are 
361         # ready before it can finalize deployment
362         from nepi.resources.linux.interface import LinuxInterface
363         ifaces = self.get_connected(LinuxInterface.get_rtype())
364         for iface in ifaces:
365             if iface.state < ResourceState.READY:
366                 self.ec.schedule(reschedule_delay, self.deploy)
367                 return 
368
369         super(LinuxNode, self).do_deploy()
370
371     def do_release(self):
372         rms = self.get_connected()
373         for rm in rms:
374             # Node needs to wait until all associated RMs are released
375             # before it can be released
376             if rm.state != ResourceState.RELEASED:
377                 self.ec.schedule(reschedule_delay, self.release)
378                 return 
379
380         tear_down = self.get("tearDown")
381         if tear_down:
382             self.execute(tear_down)
383
384         self.clean_processes()
385
386         super(LinuxNode, self).do_release()
387
388     def valid_connection(self, guid):
389         # TODO: Validate!
390         return True
391
392     def clean_processes(self):
393         self.info("Cleaning up processes")
394         
395         cmd = ("sudo -S killall tcpdump || /bin/true ; " +
396             "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
397             "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
398
399         out = err = ""
400         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
401
402     def clean_home(self):
403         """ Cleans all NEPI related folders in the Linux host
404         """
405         self.info("Cleaning up home")
406         
407         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
408                 self.home_dir )
409
410         return self.execute(cmd, with_lock = True)
411
412     def clean_experiment(self):
413         """ Cleans all experiment related files in the Linux host.
414         It preserves NEPI files and folders that have a multi experiment
415         scope.
416         """
417         self.info("Cleaning up experiment files")
418         
419         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
420                 self.exp_dir,
421                 self.ec.exp_id )
422             
423         return self.execute(cmd, with_lock = True)
424
425     def socat(self, local_socket_name, 
426         remote_socket_name,
427         sudo = False,
428         identity = None,
429         server_key = None,
430         env = None,
431         tty = False,
432         connect_timeout = 30,
433         retry = 3,
434         strict_host_checking = True):
435         """ Connectes a local and a remote UNIX socket through SSH using socat """ 
436
437         if self.localhost:
438             return (None, None), None
439         else:
440             return sshfuncs.socat(
441                 local_socket_name,
442                 remote_socket_name,
443                 host = self.get("hostname"),
444                 user = self.get("username"),
445                 port = self.get("port"),
446                 agent = True,
447                 sudo = sudo,
448                 identity = self.get("identity"),
449                 server_key = self.get("serverKey"),
450                 env = env,
451                 tty = tty,
452                 retry = retry,
453                 connect_timeout = connect_timeout,
454                 strict_host_checking = strict_host_checking
455                 )
456
457     def execute(self, command,
458             sudo = False,
459             env = None,
460             tty = False,
461             forward_x11 = False,
462             retry = 3,
463             connect_timeout = 30,
464             strict_host_checking = False,
465             persistent = True,
466             blocking = True,
467             with_lock = False
468             ):
469         """ Notice that this invocation will block until the
470         execution finishes. If this is not the desired behavior,
471         use 'run' instead."""
472
473         if self.localhost:
474             (out, err), proc = execfuncs.lexec(command, 
475                     user = self.get("username"), # still problem with localhost
476                     sudo = sudo,
477                     env = env)
478         else:
479             if with_lock:
480                 # If the execute command is blocking, we don't want to keep
481                 # the node lock. This lock is used to avoid race conditions
482                 # when creating the ControlMaster sockets. A more elegant
483                 # solution is needed.
484                 with self._node_lock:
485                     (out, err), proc = sshfuncs.rexec(
486                         command, 
487                         host = self.get("hostname"),
488                         user = self.get("username"),
489                         port = self.get("port"),
490                         agent = True,
491                         sudo = sudo,
492                         identity = self.get("identity"),
493                         server_key = self.get("serverKey"),
494                         env = env,
495                         tty = tty,
496                         forward_x11 = forward_x11,
497                         retry = retry,
498                         connect_timeout = connect_timeout,
499                         persistent = persistent,
500                         blocking = blocking, 
501                         strict_host_checking = strict_host_checking
502                         )
503             else:
504                 (out, err), proc = sshfuncs.rexec(
505                     command, 
506                     host = self.get("hostname"),
507                     user = self.get("username"),
508                     port = self.get("port"),
509                     agent = True,
510                     sudo = sudo,
511                     identity = self.get("identity"),
512                     server_key = self.get("serverKey"),
513                     env = env,
514                     tty = tty,
515                     forward_x11 = forward_x11,
516                     retry = retry,
517                     connect_timeout = connect_timeout,
518                     persistent = persistent,
519                     blocking = blocking, 
520                     strict_host_checking = strict_host_checking
521                     )
522
523         return (out, err), proc
524
525     def run(self, command, home,
526             create_home = False,
527             pidfile = 'pidfile',
528             stdin = None, 
529             stdout = 'stdout', 
530             stderr = 'stderr', 
531             sudo = False,
532             tty = False):
533         
534         self.debug("Running command '%s'" % command)
535         
536         if self.localhost:
537             (out, err), proc = execfuncs.lspawn(command, pidfile, 
538                     stdout = stdout, 
539                     stderr = stderr, 
540                     stdin = stdin, 
541                     home = home, 
542                     create_home = create_home, 
543                     sudo = sudo,
544                     user = user) 
545         else:
546             with self._node_lock:
547                 (out, err), proc = sshfuncs.rspawn(
548                     command,
549                     pidfile = pidfile,
550                     home = home,
551                     create_home = create_home,
552                     stdin = stdin if stdin is not None else '/dev/null',
553                     stdout = stdout if stdout else '/dev/null',
554                     stderr = stderr if stderr else '/dev/null',
555                     sudo = sudo,
556                     host = self.get("hostname"),
557                     user = self.get("username"),
558                     port = self.get("port"),
559                     agent = True,
560                     identity = self.get("identity"),
561                     server_key = self.get("serverKey"),
562                     tty = tty
563                     )
564
565         return (out, err), proc
566
567     def getpid(self, home, pidfile = "pidfile"):
568         if self.localhost:
569             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
570         else:
571             with self._node_lock:
572                 pidtuple = sshfuncs.rgetpid(
573                     os.path.join(home, pidfile),
574                     host = self.get("hostname"),
575                     user = self.get("username"),
576                     port = self.get("port"),
577                     agent = True,
578                     identity = self.get("identity"),
579                     server_key = self.get("serverKey")
580                     )
581         
582         return pidtuple
583
584     def status(self, pid, ppid):
585         if self.localhost:
586             status = execfuncs.lstatus(pid, ppid)
587         else:
588             with self._node_lock:
589                 status = sshfuncs.rstatus(
590                         pid, ppid,
591                         host = self.get("hostname"),
592                         user = self.get("username"),
593                         port = self.get("port"),
594                         agent = True,
595                         identity = self.get("identity"),
596                         server_key = self.get("serverKey")
597                         )
598            
599         return status
600     
601     def kill(self, pid, ppid, sudo = False):
602         out = err = ""
603         proc = None
604         status = self.status(pid, ppid)
605
606         if status == sshfuncs.ProcStatus.RUNNING:
607             if self.localhost:
608                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
609             else:
610                 with self._node_lock:
611                     (out, err), proc = sshfuncs.rkill(
612                         pid, ppid,
613                         host = self.get("hostname"),
614                         user = self.get("username"),
615                         port = self.get("port"),
616                         agent = True,
617                         sudo = sudo,
618                         identity = self.get("identity"),
619                         server_key = self.get("serverKey")
620                         )
621
622         return (out, err), proc
623
624     def copy(self, src, dst):
625         if self.localhost:
626             (out, err), proc = execfuncs.lcopy(source, dest, 
627                     recursive = True,
628                     strict_host_checking = False)
629         else:
630             (out, err), proc = sshfuncs.rcopy(
631                 src, dst, 
632                 port = self.get("port"),
633                 identity = self.get("identity"),
634                 server_key = self.get("serverKey"),
635                 recursive = True,
636                 strict_host_checking = False)
637
638         return (out, err), proc
639
640     def upload(self, src, dst, text = False, overwrite = True):
641         """ Copy content to destination
642
643            src  content to copy. Can be a local file, directory or a list of files
644
645            dst  destination path on the remote host (remote is always self.host)
646
647            text src is text input, it must be stored into a temp file before uploading
648         """
649         # If source is a string input 
650         f = None
651         if text and not os.path.isfile(src):
652             # src is text input that should be uploaded as file
653             # create a temporal file with the content to upload
654             f = tempfile.NamedTemporaryFile(delete=False)
655             f.write(src)
656             f.close()
657             src = f.name
658
659         # If dst files should not be overwritten, check that the files do not
660         # exits already 
661         if overwrite == False:
662             src = self.filter_existing_files(src, dst)
663             if not src:
664                 return ("", ""), None 
665
666         if not self.localhost:
667             # Build destination as <user>@<server>:<path>
668             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
669
670         result = self.copy(src, dst)
671
672         # clean up temp file
673         if f:
674             os.remove(f.name)
675
676         return result
677
678     def download(self, src, dst):
679         if not self.localhost:
680             # Build destination as <user>@<server>:<path>
681             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
682         return self.copy(src, dst)
683
684     def install_packages_command(self, packages):
685         command = ""
686         if self.use_rpm:
687             command = rpmfuncs.install_packages_command(self.os, packages)
688         elif self.use_deb:
689             command = debfuncs.install_packages_command(self.os, packages)
690         else:
691             msg = "Error installing packages ( OS not known ) "
692             self.error(msg, self.os)
693             raise RuntimeError, msg
694
695         return command
696
697     def install_packages(self, packages, home, run_home = None):
698         """ Install packages in the Linux host.
699
700         'home' is the directory to upload the package installation script.
701         'run_home' is the directory from where to execute the script.
702         """
703         command = self.install_packages_command(packages)
704
705         run_home = run_home or home
706
707         (out, err), proc = self.run_and_wait(command, run_home, 
708             shfile = os.path.join(home, "instpkg.sh"),
709             pidfile = "instpkg_pidfile",
710             ecodefile = "instpkg_exitcode",
711             stdout = "instpkg_stdout", 
712             stderr = "instpkg_stderr",
713             overwrite = False,
714             raise_on_error = True)
715
716         return (out, err), proc 
717
718     def remove_packages(self, packages, home, run_home = None):
719         """ Uninstall packages from the Linux host.
720
721         'home' is the directory to upload the package un-installation script.
722         'run_home' is the directory from where to execute the script.
723         """
724         if self.use_rpm:
725             command = rpmfuncs.remove_packages_command(self.os, packages)
726         elif self.use_deb:
727             command = debfuncs.remove_packages_command(self.os, packages)
728         else:
729             msg = "Error removing packages ( OS not known ) "
730             self.error(msg)
731             raise RuntimeError, msg
732
733         run_home = run_home or home
734
735         (out, err), proc = self.run_and_wait(command, run_home, 
736             shfile = os.path.join(home, "rmpkg.sh"),
737             pidfile = "rmpkg_pidfile",
738             ecodefile = "rmpkg_exitcode",
739             stdout = "rmpkg_stdout", 
740             stderr = "rmpkg_stderr",
741             overwrite = False,
742             raise_on_error = True)
743          
744         return (out, err), proc 
745
746     def mkdir(self, path, clean = False):
747         if clean:
748             self.rmdir(path)
749
750         return self.execute("mkdir -p %s" % path, with_lock = True)
751
752     def rmdir(self, path):
753         return self.execute("rm -rf %s" % path, with_lock = True)
754         
755     def run_and_wait(self, command, home, 
756             shfile = "cmd.sh",
757             env = None,
758             overwrite = True,
759             pidfile = "pidfile", 
760             ecodefile = "exitcode", 
761             stdin = None, 
762             stdout = "stdout", 
763             stderr = "stderr", 
764             sudo = False,
765             tty = False,
766             raise_on_error = False):
767         """
768         Uploads the 'command' to a bash script in the host.
769         Then runs the script detached in background in the host, and
770         busy-waites until the script finishes executing.
771         """
772
773         if not shfile.startswith("/"):
774             shfile = os.path.join(home, shfile)
775
776         self.upload_command(command, 
777             shfile = shfile, 
778             ecodefile = ecodefile, 
779             env = env,
780             overwrite = overwrite)
781
782         command = "bash %s" % shfile
783         # run command in background in remote host
784         (out, err), proc = self.run(command, home, 
785                 pidfile = pidfile,
786                 stdin = stdin, 
787                 stdout = stdout, 
788                 stderr = stderr, 
789                 sudo = sudo,
790                 tty = tty)
791
792         # check no errors occurred
793         if proc.poll():
794             msg = " Failed to run command '%s' " % command
795             self.error(msg, out, err)
796             if raise_on_error:
797                 raise RuntimeError, msg
798
799         # Wait for pid file to be generated
800         pid, ppid = self.wait_pid(
801                 home = home, 
802                 pidfile = pidfile, 
803                 raise_on_error = raise_on_error)
804
805         # wait until command finishes to execute
806         self.wait_run(pid, ppid)
807       
808         (eout, err), proc = self.check_errors(home,
809             ecodefile = ecodefile,
810             stderr = stderr)
811
812         # Out is what was written in the stderr file
813         if err:
814             msg = " Failed to run command '%s' " % command
815             self.error(msg, eout, err)
816
817             if raise_on_error:
818                 raise RuntimeError, msg
819
820         (out, oerr), proc = self.check_output(home, stdout)
821         
822         return (out, err), proc
823
824     def exitcode(self, home, ecodefile = "exitcode"):
825         """
826         Get the exit code of an application.
827         Returns an integer value with the exit code 
828         """
829         (out, err), proc = self.check_output(home, ecodefile)
830
831         # Succeeded to open file, return exit code in the file
832         if proc.wait() == 0:
833             try:
834                 return int(out.strip())
835             except:
836                 # Error in the content of the file!
837                 return ExitCode.CORRUPTFILE
838
839         # No such file or directory
840         if proc.returncode == 1:
841             return ExitCode.FILENOTFOUND
842         
843         # Other error from 'cat'
844         return ExitCode.ERROR
845
846     def upload_command(self, command, 
847             shfile = "cmd.sh",
848             ecodefile = "exitcode",
849             overwrite = True,
850             env = None):
851         """ Saves the command as a bash script file in the remote host, and
852         forces to save the exit code of the command execution to the ecodefile
853         """
854
855         if not (command.strip().endswith(";") or command.strip().endswith("&")):
856             command += ";"
857       
858         # The exit code of the command will be stored in ecodefile
859         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
860                 'command': command,
861                 'ecodefile': ecodefile,
862                 } 
863
864         # Export environment
865         environ = self.format_environment(env)
866
867         # Add environ to command
868         command = environ + command
869
870         return self.upload(command, shfile, text = True, overwrite = overwrite)
871
872     def format_environment(self, env, inline = False):
873         """ Formats the environment variables for a command to be executed
874         either as an inline command
875         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
876         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
877         """
878         if not env: return ""
879
880         # Remove extra white spaces
881         env = re.sub(r'\s+', ' ', env.strip())
882
883         sep = ";" if inline else "\n"
884         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
885
886     def check_errors(self, home, 
887             ecodefile = "exitcode", 
888             stderr = "stderr"):
889         """ Checks whether errors occurred while running a command.
890         It first checks the exit code for the command, and only if the
891         exit code is an error one it returns the error output.
892
893         """
894         proc = None
895         err = ""
896
897         # get exit code saved in the 'exitcode' file
898         ecode = self.exitcode(home, ecodefile)
899
900         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
901             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
902         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
903             # The process returned an error code or didn't exist. 
904             # Check standard error.
905             (err, eerr), proc = self.check_output(home, stderr)
906
907             # If the stderr file was not found, assume nothing bad happened,
908             # and just ignore the error.
909             # (cat returns 1 for error "No such file or directory")
910             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
911                 err = "" 
912             
913         return ("", err), proc
914  
915     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
916         """ Waits until the pid file for the command is generated, 
917             and returns the pid and ppid of the process """
918         pid = ppid = None
919         delay = 1.0
920
921         for i in xrange(2):
922             pidtuple = self.getpid(home = home, pidfile = pidfile)
923             
924             if pidtuple:
925                 pid, ppid = pidtuple
926                 break
927             else:
928                 time.sleep(delay)
929                 delay = delay * 1.5
930         else:
931             msg = " Failed to get pid for pidfile %s/%s " % (
932                     home, pidfile )
933             self.error(msg)
934             
935             if raise_on_error:
936                 raise RuntimeError, msg
937
938         return pid, ppid
939
940     def wait_run(self, pid, ppid, trial = 0):
941         """ wait for a remote process to finish execution """
942         delay = 1.0
943
944         while True:
945             status = self.status(pid, ppid)
946             
947             if status is ProcStatus.FINISHED:
948                 break
949             elif status is not ProcStatus.RUNNING:
950                 delay = delay * 1.5
951                 time.sleep(delay)
952                 # If it takes more than 20 seconds to start, then
953                 # asume something went wrong
954                 if delay > 20:
955                     break
956             else:
957                 # The app is running, just wait...
958                 time.sleep(0.5)
959
960     def check_output(self, home, filename):
961         """ Retrives content of file """
962         (out, err), proc = self.execute("cat %s" % 
963             os.path.join(home, filename), retry = 1, with_lock = True)
964         return (out, err), proc
965
966     def is_alive(self):
967         """ Checks if host is responsive
968         """
969         if self.localhost:
970             return True
971
972         out = err = ""
973         msg = "Unresponsive host. Wrong answer. "
974
975         # The underlying SSH layer will sometimes return an empty
976         # output (even if the command was executed without errors).
977         # To work arround this, repeat the operation N times or
978         # until the result is not empty string
979         try:
980             (out, err), proc = self.execute("echo 'ALIVE'",
981                     blocking = True,
982                     with_lock = True)
983     
984             if out.find("ALIVE") > -1:
985                 return True
986         except:
987             trace = traceback.format_exc()
988             msg = "Unresponsive host. Error reaching host: %s " % trace
989
990         self.error(msg, out, err)
991         return False
992
993     def find_home(self):
994         """ Retrieves host home directory
995         """
996         # The underlying SSH layer will sometimes return an empty
997         # output (even if the command was executed without errors).
998         # To work arround this, repeat the operation N times or
999         # until the result is not empty string
1000         msg = "Impossible to retrieve HOME directory"
1001         try:
1002             (out, err), proc = self.execute("echo ${HOME}",
1003                     blocking = True,
1004                     with_lock = True)
1005     
1006             if out.strip() != "":
1007                 self._home_dir =  out.strip()
1008         except:
1009             trace = traceback.format_exc()
1010             msg = "Impossible to retrieve HOME directory %s" % trace
1011
1012         if not self._home_dir:
1013             self.error(msg)
1014             raise RuntimeError, msg
1015
1016     def filter_existing_files(self, src, dst):
1017         """ Removes files that already exist in the Linux host from src list
1018         """
1019         # construct a dictionary with { dst: src }
1020         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
1021             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1022
1023         command = []
1024         for d in dests.keys():
1025             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1026
1027         command = ";".join(command)
1028
1029         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1030     
1031         for d in dests.keys():
1032             if out.find(d) > -1:
1033                 del dests[d]
1034
1035         if not dests:
1036             return ""
1037
1038         return " ".join(dests.values())
1039