popolate_factory no longer requires to be invoked explicitly by the user
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
22         reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145
146     @classmethod
147     def _register_attributes(cls):
148         hostname = Attribute("hostname", "Hostname of the machine",
149                 flags = Flags.ExecReadOnly)
150
151         username = Attribute("username", "Local account username", 
152                 flags = Flags.Credential)
153
154         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
155         
156         home = Attribute("home",
157                 "Experiment home directory to store all experiment related files",
158                 flags = Flags.ExecReadOnly)
159         
160         identity = Attribute("identity", "SSH identity file",
161                 flags = Flags.Credential)
162         
163         server_key = Attribute("serverKey", "Server public key", 
164                 flags = Flags.ExecReadOnly)
165         
166         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
167                 " from node home folder before starting experiment", 
168                 flags = Flags.ExecReadOnly)
169
170         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
171                 " from a previous same experiment, before the new experiment starts", 
172                 flags = Flags.ExecReadOnly)
173         
174         clean_processes = Attribute("cleanProcesses", 
175                 "Kill all running processes before starting experiment",
176                 flags = Flags.ExecReadOnly)
177         
178         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
179                 "releasing the resource",
180                 flags = Flags.ExecReadOnly)
181
182         cls._register_attribute(hostname)
183         cls._register_attribute(username)
184         cls._register_attribute(port)
185         cls._register_attribute(home)
186         cls._register_attribute(identity)
187         cls._register_attribute(server_key)
188         cls._register_attribute(clean_home)
189         cls._register_attribute(clean_experiment)
190         cls._register_attribute(clean_processes)
191         cls._register_attribute(tear_down)
192
193     def __init__(self, ec, guid):
194         super(LinuxNode, self).__init__(ec, guid)
195         self._os = None
196         # home directory at Linux host
197         self._home_dir = ""
198         
199         # lock to avoid concurrency issues on methods used by applications 
200         self._lock = threading.Lock()
201     
202     def log_message(self, msg):
203         return " guid %d - host %s - %s " % (self.guid, 
204                 self.get("hostname"), msg)
205
206     @property
207     def home_dir(self):
208         home = self.get("home") or ""
209         if not home.startswith("/"):
210            home = os.path.join(self._home_dir, home) 
211         return home
212
213     @property
214     def usr_dir(self):
215         return os.path.join(self.home_dir, "nepi-usr")
216
217     @property
218     def lib_dir(self):
219         return os.path.join(self.usr_dir, "lib")
220
221     @property
222     def bin_dir(self):
223         return os.path.join(self.usr_dir, "bin")
224
225     @property
226     def src_dir(self):
227         return os.path.join(self.usr_dir, "src")
228
229     @property
230     def share_dir(self):
231         return os.path.join(self.usr_dir, "share")
232
233     @property
234     def exp_dir(self):
235         return os.path.join(self.home_dir, "nepi-exp")
236
237     @property
238     def exp_home(self):
239         return os.path.join(self.exp_dir, self.ec.exp_id)
240
241     @property
242     def node_home(self):
243         return os.path.join(self.exp_home, "node-%d" % self.guid)
244
245     @property
246     def run_home(self):
247         return os.path.join(self.node_home, self.ec.run_id)
248
249     @property
250     def os(self):
251         if self._os:
252             return self._os
253
254         if (not self.get("hostname") or not self.get("username")):
255             msg = "Can't resolve OS, insufficient data "
256             self.error(msg)
257             raise RuntimeError, msg
258
259         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
260
261         if err and proc.poll():
262             msg = "Error detecting OS "
263             self.error(msg, out, err)
264             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
265
266         if out.find("Fedora release 8") == 0:
267             self._os = OSType.FEDORA_8
268         elif out.find("Fedora release 12") == 0:
269             self._os = OSType.FEDORA_12
270         elif out.find("Fedora release 14") == 0:
271             self._os = OSType.FEDORA_14
272         elif out.find("Debian") == 0: 
273             self._os = OSType.DEBIAN
274         elif out.find("Ubuntu") ==0:
275             self._os = OSType.UBUNTU
276         else:
277             msg = "Unsupported OS"
278             self.error(msg, out)
279             raise RuntimeError, "%s - %s " %( msg, out )
280
281         return self._os
282
283     @property
284     def use_deb(self):
285         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
286
287     @property
288     def use_rpm(self):
289         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
290                 OSType.FEDORA]
291
292     @property
293     def localhost(self):
294         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
295
296     def provision(self):
297         # check if host is alive
298         if not self.is_alive():
299             self.fail()
300             
301             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
302             self.error(msg)
303             raise RuntimeError, msg
304
305         self.find_home()
306
307         if self.get("cleanProcesses"):
308             self.clean_processes()
309
310         if self.get("cleanHome"):
311             self.clean_home()
312  
313         if self.get("cleanExperiment"):
314             self.clean_experiment()
315     
316         # Create shared directory structure
317         self.mkdir(self.lib_dir)
318         self.mkdir(self.bin_dir)
319         self.mkdir(self.src_dir)
320         self.mkdir(self.share_dir)
321
322         # Create experiment node home directory
323         self.mkdir(self.node_home)
324
325         super(LinuxNode, self).provision()
326
327     def deploy(self):
328         if self.state == ResourceState.NEW:
329             try:
330                 self.discover()
331                 self.provision()
332             except:
333                 self._state = ResourceState.FAILED
334                 raise
335
336         # Node needs to wait until all associated interfaces are 
337         # ready before it can finalize deployment
338         from nepi.resources.linux.interface import LinuxInterface
339         ifaces = self.get_connected(LinuxInterface.rtype())
340         for iface in ifaces:
341             if iface.state < ResourceState.READY:
342                 self.ec.schedule(reschedule_delay, self.deploy)
343                 return 
344
345         super(LinuxNode, self).deploy()
346
347     def release(self):
348         tear_down = self.get("tearDown")
349         if tear_down:
350             self.execute(tear_down)
351
352         self.clean_processes()
353
354         super(LinuxNode, self).release()
355
356     def valid_connection(self, guid):
357         # TODO: Validate!
358         return True
359
360     def clean_processes(self, killer = False):
361         self.info("Cleaning up processes")
362         
363         if killer:
364             # Hardcore kill
365             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
366                 "sudo -S killall python tcpdump || /bin/true ; " +
367                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
368                 "sudo -S killall -u root || /bin/true ; " +
369                 "sudo -S killall -u root || /bin/true ; ")
370         else:
371             # Be gentler...
372             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
373                 "sudo -S killall tcpdump || /bin/true ; " +
374                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
375                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
376
377         out = err = ""
378         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
379             
380     def clean_home(self):
381         """ Cleans all NEPI related folders in the Linux host
382         """
383         self.info("Cleaning up home")
384         
385         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
386                 self.home_dir )
387
388         return self.execute(cmd, with_lock = True)
389
390     def clean_experiment(self):
391         """ Cleans all experiment related files in the Linux host.
392         It preserves NEPI files and folders that have a multi experiment
393         scope.
394         """
395         self.info("Cleaning up experiment files")
396         
397         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
398                 self.exp_dir,
399                 self.ec.exp_id )
400             
401         return self.execute(cmd, with_lock = True)
402
403     def execute(self, command,
404             sudo = False,
405             stdin = None, 
406             env = None,
407             tty = False,
408             forward_x11 = False,
409             timeout = None,
410             retry = 3,
411             err_on_timeout = True,
412             connect_timeout = 30,
413             strict_host_checking = False,
414             persistent = True,
415             blocking = True,
416             with_lock = False
417             ):
418         """ Notice that this invocation will block until the
419         execution finishes. If this is not the desired behavior,
420         use 'run' instead."""
421
422         if self.localhost:
423             (out, err), proc = execfuncs.lexec(command, 
424                     user = user,
425                     sudo = sudo,
426                     stdin = stdin,
427                     env = env)
428         else:
429             if with_lock:
430                 with self._lock:
431                     (out, err), proc = sshfuncs.rexec(
432                         command, 
433                         host = self.get("hostname"),
434                         user = self.get("username"),
435                         port = self.get("port"),
436                         agent = True,
437                         sudo = sudo,
438                         stdin = stdin,
439                         identity = self.get("identity"),
440                         server_key = self.get("serverKey"),
441                         env = env,
442                         tty = tty,
443                         forward_x11 = forward_x11,
444                         timeout = timeout,
445                         retry = retry,
446                         err_on_timeout = err_on_timeout,
447                         connect_timeout = connect_timeout,
448                         persistent = persistent,
449                         blocking = blocking, 
450                         strict_host_checking = strict_host_checking
451                         )
452             else:
453                 (out, err), proc = sshfuncs.rexec(
454                     command, 
455                     host = self.get("hostname"),
456                     user = self.get("username"),
457                     port = self.get("port"),
458                     agent = True,
459                     sudo = sudo,
460                     stdin = stdin,
461                     identity = self.get("identity"),
462                     server_key = self.get("serverKey"),
463                     env = env,
464                     tty = tty,
465                     forward_x11 = forward_x11,
466                     timeout = timeout,
467                     retry = retry,
468                     err_on_timeout = err_on_timeout,
469                     connect_timeout = connect_timeout,
470                     persistent = persistent,
471                     blocking = blocking, 
472                     strict_host_checking = strict_host_checking
473                     )
474
475         return (out, err), proc
476
477     def run(self, command, home,
478             create_home = False,
479             pidfile = 'pidfile',
480             stdin = None, 
481             stdout = 'stdout', 
482             stderr = 'stderr', 
483             sudo = False,
484             tty = False):
485         
486         self.debug("Running command '%s'" % command)
487         
488         if self.localhost:
489             (out, err), proc = execfuncs.lspawn(command, pidfile, 
490                     stdout = stdout, 
491                     stderr = stderr, 
492                     stdin = stdin, 
493                     home = home, 
494                     create_home = create_home, 
495                     sudo = sudo,
496                     user = user) 
497         else:
498             with self._lock:
499                 (out, err), proc = sshfuncs.rspawn(
500                     command,
501                     pidfile = pidfile,
502                     home = home,
503                     create_home = create_home,
504                     stdin = stdin if stdin is not None else '/dev/null',
505                     stdout = stdout if stdout else '/dev/null',
506                     stderr = stderr if stderr else '/dev/null',
507                     sudo = sudo,
508                     host = self.get("hostname"),
509                     user = self.get("username"),
510                     port = self.get("port"),
511                     agent = True,
512                     identity = self.get("identity"),
513                     server_key = self.get("serverKey"),
514                     tty = tty
515                     )
516
517         return (out, err), proc
518
519     def getpid(self, home, pidfile = "pidfile"):
520         if self.localhost:
521             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
522         else:
523             with self._lock:
524                 pidtuple = sshfuncs.rgetpid(
525                     os.path.join(home, pidfile),
526                     host = self.get("hostname"),
527                     user = self.get("username"),
528                     port = self.get("port"),
529                     agent = True,
530                     identity = self.get("identity"),
531                     server_key = self.get("serverKey")
532                     )
533         
534         return pidtuple
535
536     def status(self, pid, ppid):
537         if self.localhost:
538             status = execfuncs.lstatus(pid, ppid)
539         else:
540             with self._lock:
541                 status = sshfuncs.rstatus(
542                         pid, ppid,
543                         host = self.get("hostname"),
544                         user = self.get("username"),
545                         port = self.get("port"),
546                         agent = True,
547                         identity = self.get("identity"),
548                         server_key = self.get("serverKey")
549                         )
550            
551         return status
552     
553     def kill(self, pid, ppid, sudo = False):
554         out = err = ""
555         proc = None
556         status = self.status(pid, ppid)
557
558         if status == sshfuncs.ProcStatus.RUNNING:
559             if self.localhost:
560                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
561             else:
562                 with self._lock:
563                     (out, err), proc = sshfuncs.rkill(
564                         pid, ppid,
565                         host = self.get("hostname"),
566                         user = self.get("username"),
567                         port = self.get("port"),
568                         agent = True,
569                         sudo = sudo,
570                         identity = self.get("identity"),
571                         server_key = self.get("serverKey")
572                         )
573
574         return (out, err), proc
575
576     def copy(self, src, dst):
577         if self.localhost:
578             (out, err), proc = execfuncs.lcopy(source, dest, 
579                     recursive = True,
580                     strict_host_checking = False)
581         else:
582             with self._lock:
583                 (out, err), proc = sshfuncs.rcopy(
584                     src, dst, 
585                     port = self.get("port"),
586                     identity = self.get("identity"),
587                     server_key = self.get("serverKey"),
588                     recursive = True,
589                     strict_host_checking = False)
590
591         return (out, err), proc
592
593
594     def upload(self, src, dst, text = False, overwrite = True):
595         """ Copy content to destination
596
597            src  content to copy. Can be a local file, directory or a list of files
598
599            dst  destination path on the remote host (remote is always self.host)
600
601            text src is text input, it must be stored into a temp file before uploading
602         """
603         # If source is a string input 
604         f = None
605         if text and not os.path.isfile(src):
606             # src is text input that should be uploaded as file
607             # create a temporal file with the content to upload
608             f = tempfile.NamedTemporaryFile(delete=False)
609             f.write(src)
610             f.close()
611             src = f.name
612
613         # If dst files should not be overwritten, check that the files do not
614         # exits already 
615         if overwrite == False:
616             src = self.filter_existing_files(src, dst)
617             if not src:
618                 return ("", ""), None 
619
620         if not self.localhost:
621             # Build destination as <user>@<server>:<path>
622             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
623
624         result = self.copy(src, dst)
625
626         # clean up temp file
627         if f:
628             os.remove(f.name)
629
630         return result
631
632     def download(self, src, dst):
633         if not self.localhost:
634             # Build destination as <user>@<server>:<path>
635             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
636         return self.copy(src, dst)
637
638     def install_packages(self, packages, home, run_home = None):
639         """ Install packages in the Linux host.
640
641         'home' is the directory to upload the package installation script.
642         'run_home' is the directory from where to execute the script.
643         """
644         command = ""
645         if self.use_rpm:
646             command = rpmfuncs.install_packages_command(self.os, packages)
647         elif self.use_deb:
648             command = debfuncs.install_packages_command(self.os, packages)
649         else:
650             msg = "Error installing packages ( OS not known ) "
651             self.error(msg, self.os)
652             raise RuntimeError, msg
653
654         run_home = run_home or home
655
656         (out, err), proc = self.run_and_wait(command, run_home, 
657             shfile = os.path.join(home, "instpkg.sh"),
658             pidfile = "instpkg_pidfile",
659             ecodefile = "instpkg_exitcode",
660             stdout = "instpkg_stdout", 
661             stderr = "instpkg_stderr",
662             overwrite = False,
663             raise_on_error = True)
664
665         return (out, err), proc 
666
667     def remove_packages(self, packages, home, run_home = None):
668         """ Uninstall packages from the Linux host.
669
670         'home' is the directory to upload the package un-installation script.
671         'run_home' is the directory from where to execute the script.
672         """
673         if self.use_rpm:
674             command = rpmfuncs.remove_packages_command(self.os, packages)
675         elif self.use_deb:
676             command = debfuncs.remove_packages_command(self.os, packages)
677         else:
678             msg = "Error removing packages ( OS not known ) "
679             self.error(msg)
680             raise RuntimeError, msg
681
682         run_home = run_home or home
683
684         (out, err), proc = self.run_and_wait(command, run_home, 
685             shfile = os.path.join(home, "rmpkg.sh"),
686             pidfile = "rmpkg_pidfile",
687             ecodefile = "rmpkg_exitcode",
688             stdout = "rmpkg_stdout", 
689             stderr = "rmpkg_stderr",
690             overwrite = False,
691             raise_on_error = True)
692          
693         return (out, err), proc 
694
695     def mkdir(self, path, clean = False):
696         if clean:
697             self.rmdir(path)
698
699         return self.execute("mkdir -p %s" % path, with_lock = True)
700
701     def rmdir(self, path):
702         return self.execute("rm -rf %s" % path, with_lock = True)
703         
704     def run_and_wait(self, command, home, 
705             shfile = "cmd.sh",
706             env = None,
707             overwrite = True,
708             pidfile = "pidfile", 
709             ecodefile = "exitcode", 
710             stdin = None, 
711             stdout = "stdout", 
712             stderr = "stderr", 
713             sudo = False,
714             tty = False,
715             raise_on_error = False):
716         """
717         Uploads the 'command' to a bash script in the host.
718         Then runs the script detached in background in the host, and
719         busy-waites until the script finishes executing.
720         """
721
722         if not shfile.startswith("/"):
723             shfile = os.path.join(home, shfile)
724
725         self.upload_command(command, 
726             shfile = shfile, 
727             ecodefile = ecodefile, 
728             env = env,
729             overwrite = overwrite)
730
731         command = "bash %s" % shfile
732         # run command in background in remote host
733         (out, err), proc = self.run(command, home, 
734                 pidfile = pidfile,
735                 stdin = stdin, 
736                 stdout = stdout, 
737                 stderr = stderr, 
738                 sudo = sudo,
739                 tty = tty)
740
741         # check no errors occurred
742         if proc.poll():
743             msg = " Failed to run command '%s' " % command
744             self.error(msg, out, err)
745             if raise_on_error:
746                 raise RuntimeError, msg
747
748         # Wait for pid file to be generated
749         pid, ppid = self.wait_pid(
750                 home = home, 
751                 pidfile = pidfile, 
752                 raise_on_error = raise_on_error)
753
754         # wait until command finishes to execute
755         self.wait_run(pid, ppid)
756       
757         (eout, err), proc = self.check_errors(home,
758             ecodefile = ecodefile,
759             stderr = stderr)
760
761         # Out is what was written in the stderr file
762         if err:
763             msg = " Failed to run command '%s' " % command
764             self.error(msg, eout, err)
765
766             if raise_on_error:
767                 raise RuntimeError, msg
768
769         (out, oerr), proc = self.check_output(home, stdout)
770         
771         return (out, err), proc
772
773     def exitcode(self, home, ecodefile = "exitcode"):
774         """
775         Get the exit code of an application.
776         Returns an integer value with the exit code 
777         """
778         (out, err), proc = self.check_output(home, ecodefile)
779
780         # Succeeded to open file, return exit code in the file
781         if proc.wait() == 0:
782             try:
783                 return int(out.strip())
784             except:
785                 # Error in the content of the file!
786                 return ExitCode.CORRUPTFILE
787
788         # No such file or directory
789         if proc.returncode == 1:
790             return ExitCode.FILENOTFOUND
791         
792         # Other error from 'cat'
793         return ExitCode.ERROR
794
795     def upload_command(self, command, 
796             shfile = "cmd.sh",
797             ecodefile = "exitcode",
798             overwrite = True,
799             env = None):
800         """ Saves the command as a bash script file in the remote host, and
801         forces to save the exit code of the command execution to the ecodefile
802         """
803
804         if not (command.strip().endswith(";") or command.strip().endswith("&")):
805             command += ";"
806       
807         # The exit code of the command will be stored in ecodefile
808         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
809                 'command': command,
810                 'ecodefile': ecodefile,
811                 } 
812
813         # Export environment
814         environ = self.format_environment(env)
815
816         # Add environ to command
817         command = environ + command
818
819         return self.upload(command, shfile, text = True, overwrite = overwrite)
820
821     def format_environment(self, env, inline = False):
822         """ Formats the environment variables for a command to be executed
823         either as an inline command
824         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
825         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
826         """
827         if not env: return ""
828
829         # Remove extra white spaces
830         env = re.sub(r'\s+', ' ', env.strip())
831
832         sep = ";" if inline else "\n"
833         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
834
835     def check_errors(self, home, 
836             ecodefile = "exitcode", 
837             stderr = "stderr"):
838         """ Checks whether errors occurred while running a command.
839         It first checks the exit code for the command, and only if the
840         exit code is an error one it returns the error output.
841
842         """
843         proc = None
844         err = ""
845
846         # get exit code saved in the 'exitcode' file
847         ecode = self.exitcode(home, ecodefile)
848
849         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
850             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
851         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
852             # The process returned an error code or didn't exist. 
853             # Check standard error.
854             (err, eerr), proc = self.check_output(home, stderr)
855
856             # If the stderr file was not found, assume nothing bad happened,
857             # and just ignore the error.
858             # (cat returns 1 for error "No such file or directory")
859             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
860                 err = "" 
861             
862         return ("", err), proc
863  
864     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
865         """ Waits until the pid file for the command is generated, 
866             and returns the pid and ppid of the process """
867         pid = ppid = None
868         delay = 1.0
869
870         for i in xrange(4):
871             pidtuple = self.getpid(home = home, pidfile = pidfile)
872             
873             if pidtuple:
874                 pid, ppid = pidtuple
875                 break
876             else:
877                 time.sleep(delay)
878                 delay = delay * 1.5
879         else:
880             msg = " Failed to get pid for pidfile %s/%s " % (
881                     home, pidfile )
882             self.error(msg)
883             
884             if raise_on_error:
885                 raise RuntimeError, msg
886
887         return pid, ppid
888
889     def wait_run(self, pid, ppid, trial = 0):
890         """ wait for a remote process to finish execution """
891         start_delay = 1.0
892
893         while True:
894             status = self.status(pid, ppid)
895             
896             if status is ProcStatus.FINISHED:
897                 break
898             elif status is not ProcStatus.RUNNING:
899                 delay = delay * 1.5
900                 time.sleep(delay)
901                 # If it takes more than 20 seconds to start, then
902                 # asume something went wrong
903                 if delay > 20:
904                     break
905             else:
906                 # The app is running, just wait...
907                 time.sleep(0.5)
908
909     def check_output(self, home, filename):
910         """ Retrives content of file """
911         (out, err), proc = self.execute("cat %s" % 
912             os.path.join(home, filename), retry = 1, with_lock = True)
913         return (out, err), proc
914
915     def is_alive(self):
916         """ Checks if host is responsive
917         """
918         if self.localhost:
919             return True
920
921         out = err = ""
922         try:
923             (out, err), proc = self.execute("echo 'ALIVE'",
924                     retry = 5, 
925                     with_lock = True)
926         except:
927             trace = traceback.format_exc()
928             msg = "Unresponsive host  %s " % err
929             self.error(msg, out, trace)
930             return False
931
932         if out.strip() == "ALIVE":
933             return True
934         else:
935             msg = "Unresponsive host "
936             self.error(msg, out, err)
937             return False
938
939     def find_home(self):
940         """ Retrieves host home directory
941         """
942         (out, err), proc = self.execute("echo ${HOME}", retry = 5, 
943                     with_lock = True)
944
945         if proc.poll():
946             msg = "Imposible to retrieve HOME directory"
947             self.error(msg, out, err)
948             raise RuntimeError, msg
949
950         self._home_dir =  out.strip()
951
952     def filter_existing_files(self, src, dst):
953         """ Removes files that already exist in the Linux host from src list
954         """
955         # construct a dictionary with { dst: src }
956         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
957             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
958
959         command = []
960         for d in dests.keys():
961             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
962
963         command = ";".join(command)
964
965         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
966     
967         for d in dests.keys():
968             if out.find(d) > -1:
969                 del dests[d]
970
971         if not dests:
972             return ""
973
974         return " ".join(dests.values())
975