Improvements to UdpTunnel
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
22         reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145
146     @classmethod
147     def _register_attributes(cls):
148         hostname = Attribute("hostname", "Hostname of the machine",
149                 flags = Flags.ExecReadOnly)
150
151         username = Attribute("username", "Local account username", 
152                 flags = Flags.Credential)
153
154         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
155         
156         home = Attribute("home",
157                 "Experiment home directory to store all experiment related files",
158                 flags = Flags.ExecReadOnly)
159         
160         identity = Attribute("identity", "SSH identity file",
161                 flags = Flags.Credential)
162         
163         server_key = Attribute("serverKey", "Server public key", 
164                 flags = Flags.ExecReadOnly)
165         
166         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
167                 " from node home folder before starting experiment", 
168                 flags = Flags.ExecReadOnly)
169
170         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
171                 " from a previous same experiment, before the new experiment starts", 
172                 flags = Flags.ExecReadOnly)
173         
174         clean_processes = Attribute("cleanProcesses", 
175                 "Kill all running processes before starting experiment",
176                 flags = Flags.ExecReadOnly)
177         
178         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
179                 "releasing the resource",
180                 flags = Flags.ExecReadOnly)
181
182         cls._register_attribute(hostname)
183         cls._register_attribute(username)
184         cls._register_attribute(port)
185         cls._register_attribute(home)
186         cls._register_attribute(identity)
187         cls._register_attribute(server_key)
188         cls._register_attribute(clean_home)
189         cls._register_attribute(clean_experiment)
190         cls._register_attribute(clean_processes)
191         cls._register_attribute(tear_down)
192
193     def __init__(self, ec, guid):
194         super(LinuxNode, self).__init__(ec, guid)
195         self._os = None
196         # home directory at Linux host
197         self._home_dir = ""
198         
199         # lock to avoid concurrency issues on methods used by applications 
200         self._lock = threading.Lock()
201     
202     def log_message(self, msg):
203         return " guid %d - host %s - %s " % (self.guid, 
204                 self.get("hostname"), msg)
205
206     @property
207     def home_dir(self):
208         home = self.get("home") or ""
209         if not home.startswith("/"):
210            home = os.path.join(self._home_dir, home) 
211         return home
212
213     @property
214     def usr_dir(self):
215         return os.path.join(self.home_dir, "nepi-usr")
216
217     @property
218     def lib_dir(self):
219         return os.path.join(self.usr_dir, "lib")
220
221     @property
222     def bin_dir(self):
223         return os.path.join(self.usr_dir, "bin")
224
225     @property
226     def src_dir(self):
227         return os.path.join(self.usr_dir, "src")
228
229     @property
230     def share_dir(self):
231         return os.path.join(self.usr_dir, "share")
232
233     @property
234     def exp_dir(self):
235         return os.path.join(self.home_dir, "nepi-exp")
236
237     @property
238     def exp_home(self):
239         return os.path.join(self.exp_dir, self.ec.exp_id)
240
241     @property
242     def node_home(self):
243         return os.path.join(self.exp_home, "node-%d" % self.guid)
244
245     @property
246     def run_home(self):
247         return os.path.join(self.node_home, self.ec.run_id)
248
249     @property
250     def os(self):
251         if self._os:
252             return self._os
253
254         if (not self.get("hostname") or not self.get("username")):
255             msg = "Can't resolve OS, insufficient data "
256             self.error(msg)
257             raise RuntimeError, msg
258
259         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
260
261         if err and proc.poll():
262             msg = "Error detecting OS "
263             self.error(msg, out, err)
264             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
265
266         if out.find("Fedora release 8") == 0:
267             self._os = OSType.FEDORA_8
268         elif out.find("Fedora release 12") == 0:
269             self._os = OSType.FEDORA_12
270         elif out.find("Fedora release 14") == 0:
271             self._os = OSType.FEDORA_14
272         elif out.find("Debian") == 0: 
273             self._os = OSType.DEBIAN
274         elif out.find("Ubuntu") ==0:
275             self._os = OSType.UBUNTU
276         else:
277             msg = "Unsupported OS"
278             self.error(msg, out)
279             raise RuntimeError, "%s - %s " %( msg, out )
280
281         return self._os
282
283     @property
284     def use_deb(self):
285         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
286
287     @property
288     def use_rpm(self):
289         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
290                 OSType.FEDORA]
291
292     @property
293     def localhost(self):
294         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
295
296     def provision(self):
297         # check if host is alive
298         if not self.is_alive():
299             self.fail()
300             
301             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
302             self.error(msg)
303             raise RuntimeError, msg
304
305         self.find_home()
306
307         if self.get("cleanProcesses"):
308             self.clean_processes()
309
310         if self.get("cleanHome"):
311             self.clean_home()
312  
313         if self.get("cleanExperiment"):
314             self.clean_experiment()
315     
316         # Create shared directory structure
317         self.mkdir(self.lib_dir)
318         self.mkdir(self.bin_dir)
319         self.mkdir(self.src_dir)
320         self.mkdir(self.share_dir)
321
322         # Create experiment node home directory
323         self.mkdir(self.node_home)
324
325         super(LinuxNode, self).provision()
326
327     def deploy(self):
328         if self.state == ResourceState.NEW:
329             try:
330                 self.discover()
331                 self.provision()
332             except:
333                 self._state = ResourceState.FAILED
334                 raise
335
336         # Node needs to wait until all associated interfaces are 
337         # ready before it can finalize deployment
338         from nepi.resources.linux.interface import LinuxInterface
339         ifaces = self.get_connected(LinuxInterface.rtype())
340         for iface in ifaces:
341             if iface.state < ResourceState.READY:
342                 self.ec.schedule(reschedule_delay, self.deploy)
343                 return 
344
345         super(LinuxNode, self).deploy()
346
347     def release(self):
348         # Node needs to wait until all associated RMs are released
349         # to be released
350         rms = self.get_connected()
351         for rm in rms:
352             if rm.state < ResourceState.STOPPED:
353                 self.ec.schedule(reschedule_delay, self.release)
354                 return 
355
356         tear_down = self.get("tearDown")
357         if tear_down:
358             self.execute(tear_down)
359
360         self.clean_processes()
361
362         super(LinuxNode, self).release()
363
364     def valid_connection(self, guid):
365         # TODO: Validate!
366         return True
367
368     def clean_processes(self, killer = False):
369         self.info("Cleaning up processes")
370         
371         if killer:
372             # Hardcore kill
373             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
374                 "sudo -S killall python tcpdump || /bin/true ; " +
375                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
376                 "sudo -S killall -u root || /bin/true ; " +
377                 "sudo -S killall -u root || /bin/true ; ")
378         else:
379             # Be gentler...
380             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
381                 "sudo -S killall tcpdump || /bin/true ; " +
382                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
383                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
384
385         out = err = ""
386         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
387             
388     def clean_home(self):
389         """ Cleans all NEPI related folders in the Linux host
390         """
391         self.info("Cleaning up home")
392         
393         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
394                 self.home_dir )
395
396         return self.execute(cmd, with_lock = True)
397
398     def clean_experiment(self):
399         """ Cleans all experiment related files in the Linux host.
400         It preserves NEPI files and folders that have a multi experiment
401         scope.
402         """
403         self.info("Cleaning up experiment files")
404         
405         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
406                 self.exp_dir,
407                 self.ec.exp_id )
408             
409         return self.execute(cmd, with_lock = True)
410
411     def execute(self, command,
412             sudo = False,
413             stdin = None, 
414             env = None,
415             tty = False,
416             forward_x11 = False,
417             timeout = None,
418             retry = 3,
419             err_on_timeout = True,
420             connect_timeout = 30,
421             strict_host_checking = False,
422             persistent = True,
423             blocking = True,
424             with_lock = False
425             ):
426         """ Notice that this invocation will block until the
427         execution finishes. If this is not the desired behavior,
428         use 'run' instead."""
429
430         if self.localhost:
431             (out, err), proc = execfuncs.lexec(command, 
432                     user = user,
433                     sudo = sudo,
434                     stdin = stdin,
435                     env = env)
436         else:
437             if with_lock:
438                 with self._lock:
439                     (out, err), proc = sshfuncs.rexec(
440                         command, 
441                         host = self.get("hostname"),
442                         user = self.get("username"),
443                         port = self.get("port"),
444                         agent = True,
445                         sudo = sudo,
446                         stdin = stdin,
447                         identity = self.get("identity"),
448                         server_key = self.get("serverKey"),
449                         env = env,
450                         tty = tty,
451                         forward_x11 = forward_x11,
452                         timeout = timeout,
453                         retry = retry,
454                         err_on_timeout = err_on_timeout,
455                         connect_timeout = connect_timeout,
456                         persistent = persistent,
457                         blocking = blocking, 
458                         strict_host_checking = strict_host_checking
459                         )
460             else:
461                 (out, err), proc = sshfuncs.rexec(
462                     command, 
463                     host = self.get("hostname"),
464                     user = self.get("username"),
465                     port = self.get("port"),
466                     agent = True,
467                     sudo = sudo,
468                     stdin = stdin,
469                     identity = self.get("identity"),
470                     server_key = self.get("serverKey"),
471                     env = env,
472                     tty = tty,
473                     forward_x11 = forward_x11,
474                     timeout = timeout,
475                     retry = retry,
476                     err_on_timeout = err_on_timeout,
477                     connect_timeout = connect_timeout,
478                     persistent = persistent,
479                     blocking = blocking, 
480                     strict_host_checking = strict_host_checking
481                     )
482
483         return (out, err), proc
484
485     def run(self, command, home,
486             create_home = False,
487             pidfile = 'pidfile',
488             stdin = None, 
489             stdout = 'stdout', 
490             stderr = 'stderr', 
491             sudo = False,
492             tty = False):
493         
494         self.debug("Running command '%s'" % command)
495         
496         if self.localhost:
497             (out, err), proc = execfuncs.lspawn(command, pidfile, 
498                     stdout = stdout, 
499                     stderr = stderr, 
500                     stdin = stdin, 
501                     home = home, 
502                     create_home = create_home, 
503                     sudo = sudo,
504                     user = user) 
505         else:
506             with self._lock:
507                 (out, err), proc = sshfuncs.rspawn(
508                     command,
509                     pidfile = pidfile,
510                     home = home,
511                     create_home = create_home,
512                     stdin = stdin if stdin is not None else '/dev/null',
513                     stdout = stdout if stdout else '/dev/null',
514                     stderr = stderr if stderr else '/dev/null',
515                     sudo = sudo,
516                     host = self.get("hostname"),
517                     user = self.get("username"),
518                     port = self.get("port"),
519                     agent = True,
520                     identity = self.get("identity"),
521                     server_key = self.get("serverKey"),
522                     tty = tty
523                     )
524
525         return (out, err), proc
526
527     def getpid(self, home, pidfile = "pidfile"):
528         if self.localhost:
529             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
530         else:
531             with self._lock:
532                 pidtuple = sshfuncs.rgetpid(
533                     os.path.join(home, pidfile),
534                     host = self.get("hostname"),
535                     user = self.get("username"),
536                     port = self.get("port"),
537                     agent = True,
538                     identity = self.get("identity"),
539                     server_key = self.get("serverKey")
540                     )
541         
542         return pidtuple
543
544     def status(self, pid, ppid):
545         if self.localhost:
546             status = execfuncs.lstatus(pid, ppid)
547         else:
548             with self._lock:
549                 status = sshfuncs.rstatus(
550                         pid, ppid,
551                         host = self.get("hostname"),
552                         user = self.get("username"),
553                         port = self.get("port"),
554                         agent = True,
555                         identity = self.get("identity"),
556                         server_key = self.get("serverKey")
557                         )
558            
559         return status
560     
561     def kill(self, pid, ppid, sudo = False):
562         out = err = ""
563         proc = None
564         status = self.status(pid, ppid)
565
566         if status == sshfuncs.ProcStatus.RUNNING:
567             if self.localhost:
568                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
569             else:
570                 with self._lock:
571                     (out, err), proc = sshfuncs.rkill(
572                         pid, ppid,
573                         host = self.get("hostname"),
574                         user = self.get("username"),
575                         port = self.get("port"),
576                         agent = True,
577                         sudo = sudo,
578                         identity = self.get("identity"),
579                         server_key = self.get("serverKey")
580                         )
581
582         return (out, err), proc
583
584     def copy(self, src, dst):
585         if self.localhost:
586             (out, err), proc = execfuncs.lcopy(source, dest, 
587                     recursive = True,
588                     strict_host_checking = False)
589         else:
590             with self._lock:
591                 (out, err), proc = sshfuncs.rcopy(
592                     src, dst, 
593                     port = self.get("port"),
594                     identity = self.get("identity"),
595                     server_key = self.get("serverKey"),
596                     recursive = True,
597                     strict_host_checking = False)
598
599         return (out, err), proc
600
601
602     def upload(self, src, dst, text = False, overwrite = True):
603         """ Copy content to destination
604
605            src  content to copy. Can be a local file, directory or a list of files
606
607            dst  destination path on the remote host (remote is always self.host)
608
609            text src is text input, it must be stored into a temp file before uploading
610         """
611         # If source is a string input 
612         f = None
613         if text and not os.path.isfile(src):
614             # src is text input that should be uploaded as file
615             # create a temporal file with the content to upload
616             f = tempfile.NamedTemporaryFile(delete=False)
617             f.write(src)
618             f.close()
619             src = f.name
620
621         # If dst files should not be overwritten, check that the files do not
622         # exits already 
623         if overwrite == False:
624             src = self.filter_existing_files(src, dst)
625             if not src:
626                 return ("", ""), None 
627
628         if not self.localhost:
629             # Build destination as <user>@<server>:<path>
630             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
631
632         result = self.copy(src, dst)
633
634         # clean up temp file
635         if f:
636             os.remove(f.name)
637
638         return result
639
640     def download(self, src, dst):
641         if not self.localhost:
642             # Build destination as <user>@<server>:<path>
643             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
644         return self.copy(src, dst)
645
646     def install_packages(self, packages, home, run_home = None):
647         """ Install packages in the Linux host.
648
649         'home' is the directory to upload the package installation script.
650         'run_home' is the directory from where to execute the script.
651         """
652         command = ""
653         if self.use_rpm:
654             command = rpmfuncs.install_packages_command(self.os, packages)
655         elif self.use_deb:
656             command = debfuncs.install_packages_command(self.os, packages)
657         else:
658             msg = "Error installing packages ( OS not known ) "
659             self.error(msg, self.os)
660             raise RuntimeError, msg
661
662         run_home = run_home or home
663
664         (out, err), proc = self.run_and_wait(command, run_home, 
665             shfile = os.path.join(home, "instpkg.sh"),
666             pidfile = "instpkg_pidfile",
667             ecodefile = "instpkg_exitcode",
668             stdout = "instpkg_stdout", 
669             stderr = "instpkg_stderr",
670             overwrite = False,
671             raise_on_error = True)
672
673         return (out, err), proc 
674
675     def remove_packages(self, packages, home, run_home = None):
676         """ Uninstall packages from the Linux host.
677
678         'home' is the directory to upload the package un-installation script.
679         'run_home' is the directory from where to execute the script.
680         """
681         if self.use_rpm:
682             command = rpmfuncs.remove_packages_command(self.os, packages)
683         elif self.use_deb:
684             command = debfuncs.remove_packages_command(self.os, packages)
685         else:
686             msg = "Error removing packages ( OS not known ) "
687             self.error(msg)
688             raise RuntimeError, msg
689
690         run_home = run_home or home
691
692         (out, err), proc = self.run_and_wait(command, run_home, 
693             shfile = os.path.join(home, "rmpkg.sh"),
694             pidfile = "rmpkg_pidfile",
695             ecodefile = "rmpkg_exitcode",
696             stdout = "rmpkg_stdout", 
697             stderr = "rmpkg_stderr",
698             overwrite = False,
699             raise_on_error = True)
700          
701         return (out, err), proc 
702
703     def mkdir(self, path, clean = False):
704         if clean:
705             self.rmdir(path)
706
707         return self.execute("mkdir -p %s" % path, with_lock = True)
708
709     def rmdir(self, path):
710         return self.execute("rm -rf %s" % path, with_lock = True)
711         
712     def run_and_wait(self, command, home, 
713             shfile = "cmd.sh",
714             env = None,
715             overwrite = True,
716             pidfile = "pidfile", 
717             ecodefile = "exitcode", 
718             stdin = None, 
719             stdout = "stdout", 
720             stderr = "stderr", 
721             sudo = False,
722             tty = False,
723             raise_on_error = False):
724         """
725         Uploads the 'command' to a bash script in the host.
726         Then runs the script detached in background in the host, and
727         busy-waites until the script finishes executing.
728         """
729
730         if not shfile.startswith("/"):
731             shfile = os.path.join(home, shfile)
732
733         self.upload_command(command, 
734             shfile = shfile, 
735             ecodefile = ecodefile, 
736             env = env,
737             overwrite = overwrite)
738
739         command = "bash %s" % shfile
740         # run command in background in remote host
741         (out, err), proc = self.run(command, home, 
742                 pidfile = pidfile,
743                 stdin = stdin, 
744                 stdout = stdout, 
745                 stderr = stderr, 
746                 sudo = sudo,
747                 tty = tty)
748
749         # check no errors occurred
750         if proc.poll():
751             msg = " Failed to run command '%s' " % command
752             self.error(msg, out, err)
753             if raise_on_error:
754                 raise RuntimeError, msg
755
756         # Wait for pid file to be generated
757         pid, ppid = self.wait_pid(
758                 home = home, 
759                 pidfile = pidfile, 
760                 raise_on_error = raise_on_error)
761
762         # wait until command finishes to execute
763         self.wait_run(pid, ppid)
764       
765         (eout, err), proc = self.check_errors(home,
766             ecodefile = ecodefile,
767             stderr = stderr)
768
769         # Out is what was written in the stderr file
770         if err:
771             msg = " Failed to run command '%s' " % command
772             self.error(msg, eout, err)
773
774             if raise_on_error:
775                 raise RuntimeError, msg
776
777         (out, oerr), proc = self.check_output(home, stdout)
778         
779         return (out, err), proc
780
781     def exitcode(self, home, ecodefile = "exitcode"):
782         """
783         Get the exit code of an application.
784         Returns an integer value with the exit code 
785         """
786         (out, err), proc = self.check_output(home, ecodefile)
787
788         # Succeeded to open file, return exit code in the file
789         if proc.wait() == 0:
790             try:
791                 return int(out.strip())
792             except:
793                 # Error in the content of the file!
794                 return ExitCode.CORRUPTFILE
795
796         # No such file or directory
797         if proc.returncode == 1:
798             return ExitCode.FILENOTFOUND
799         
800         # Other error from 'cat'
801         return ExitCode.ERROR
802
803     def upload_command(self, command, 
804             shfile = "cmd.sh",
805             ecodefile = "exitcode",
806             overwrite = True,
807             env = None):
808         """ Saves the command as a bash script file in the remote host, and
809         forces to save the exit code of the command execution to the ecodefile
810         """
811
812         if not (command.strip().endswith(";") or command.strip().endswith("&")):
813             command += ";"
814       
815         # The exit code of the command will be stored in ecodefile
816         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
817                 'command': command,
818                 'ecodefile': ecodefile,
819                 } 
820
821         # Export environment
822         environ = self.format_environment(env)
823
824         # Add environ to command
825         command = environ + command
826
827         return self.upload(command, shfile, text = True, overwrite = overwrite)
828
829     def format_environment(self, env, inline = False):
830         """ Formats the environment variables for a command to be executed
831         either as an inline command
832         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
833         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
834         """
835         if not env: return ""
836
837         # Remove extra white spaces
838         env = re.sub(r'\s+', ' ', env.strip())
839
840         sep = ";" if inline else "\n"
841         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
842
843     def check_errors(self, home, 
844             ecodefile = "exitcode", 
845             stderr = "stderr"):
846         """ Checks whether errors occurred while running a command.
847         It first checks the exit code for the command, and only if the
848         exit code is an error one it returns the error output.
849
850         """
851         proc = None
852         err = ""
853
854         # get exit code saved in the 'exitcode' file
855         ecode = self.exitcode(home, ecodefile)
856
857         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
858             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
859         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
860             # The process returned an error code or didn't exist. 
861             # Check standard error.
862             (err, eerr), proc = self.check_output(home, stderr)
863
864             # If the stderr file was not found, assume nothing bad happened,
865             # and just ignore the error.
866             # (cat returns 1 for error "No such file or directory")
867             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
868                 err = "" 
869             
870         return ("", err), proc
871  
872     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
873         """ Waits until the pid file for the command is generated, 
874             and returns the pid and ppid of the process """
875         pid = ppid = None
876         delay = 1.0
877
878         for i in xrange(4):
879             pidtuple = self.getpid(home = home, pidfile = pidfile)
880             
881             if pidtuple:
882                 pid, ppid = pidtuple
883                 break
884             else:
885                 time.sleep(delay)
886                 delay = delay * 1.5
887         else:
888             msg = " Failed to get pid for pidfile %s/%s " % (
889                     home, pidfile )
890             self.error(msg)
891             
892             if raise_on_error:
893                 raise RuntimeError, msg
894
895         return pid, ppid
896
897     def wait_run(self, pid, ppid, trial = 0):
898         """ wait for a remote process to finish execution """
899         start_delay = 1.0
900
901         while True:
902             status = self.status(pid, ppid)
903             
904             if status is ProcStatus.FINISHED:
905                 break
906             elif status is not ProcStatus.RUNNING:
907                 delay = delay * 1.5
908                 time.sleep(delay)
909                 # If it takes more than 20 seconds to start, then
910                 # asume something went wrong
911                 if delay > 20:
912                     break
913             else:
914                 # The app is running, just wait...
915                 time.sleep(0.5)
916
917     def check_output(self, home, filename):
918         """ Retrives content of file """
919         (out, err), proc = self.execute("cat %s" % 
920             os.path.join(home, filename), retry = 1, with_lock = True)
921         return (out, err), proc
922
923     def is_alive(self):
924         """ Checks if host is responsive
925         """
926         if self.localhost:
927             return True
928
929         out = err = ""
930         try:
931             (out, err), proc = self.execute("echo 'ALIVE'",
932                     retry = 5, 
933                     with_lock = True)
934         except:
935             trace = traceback.format_exc()
936             msg = "Unresponsive host  %s " % err
937             self.error(msg, out, trace)
938             return False
939
940         if out.strip() == "ALIVE":
941             return True
942         else:
943             msg = "Unresponsive host "
944             self.error(msg, out, err)
945             return False
946
947     def find_home(self):
948         """ Retrieves host home directory
949         """
950         (out, err), proc = self.execute("echo ${HOME}", retry = 5, 
951                     with_lock = True)
952
953         if proc.poll():
954             msg = "Imposible to retrieve HOME directory"
955             self.error(msg, out, err)
956             raise RuntimeError, msg
957
958         self._home_dir =  out.strip()
959
960     def filter_existing_files(self, src, dst):
961         """ Removes files that already exist in the Linux host from src list
962         """
963         # construct a dictionary with { dst: src }
964         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
965             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
966
967         command = []
968         for d in dests.keys():
969             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
970
971         command = ";".join(command)
972
973         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
974     
975         for d in dests.keys():
976             if out.find(d) > -1:
977                 del dests[d]
978
979         if not dests:
980             return ""
981
982         return " ".join(dests.values())
983