Adding linux ns3 server unit test
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.Design)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.Design)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.Design)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.Design)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.Design)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.Design)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.Design)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.Design)
190
191         cls._register_attribute(hostname)
192         cls._register_attribute(username)
193         cls._register_attribute(port)
194         cls._register_attribute(home)
195         cls._register_attribute(identity)
196         cls._register_attribute(server_key)
197         cls._register_attribute(clean_home)
198         cls._register_attribute(clean_experiment)
199         cls._register_attribute(clean_processes)
200         cls._register_attribute(tear_down)
201
202     def __init__(self, ec, guid):
203         super(LinuxNode, self).__init__(ec, guid)
204         self._os = None
205         # home directory at Linux host
206         self._home_dir = ""
207         
208         # lock to prevent concurrent applications on the same node,
209         # to execute commands at the same time. There are potential
210         # concurrency issues when using SSH to a same host from 
211         # multiple threads. There are also possible operational 
212         # issues, e.g. an application querying the existence 
213         # of a file or folder prior to its creation, and another 
214         # application creating the same file or folder in between.
215         self._node_lock = threading.Lock()
216     
217     def log_message(self, msg):
218         return " guid %d - host %s - %s " % (self.guid, 
219                 self.get("hostname"), msg)
220
221     @property
222     def home_dir(self):
223         home = self.get("home") or ""
224         if not home.startswith("/"):
225            home = os.path.join(self._home_dir, home) 
226         return home
227
228     @property
229     def usr_dir(self):
230         return os.path.join(self.home_dir, "nepi-usr")
231
232     @property
233     def lib_dir(self):
234         return os.path.join(self.usr_dir, "lib")
235
236     @property
237     def bin_dir(self):
238         return os.path.join(self.usr_dir, "bin")
239
240     @property
241     def src_dir(self):
242         return os.path.join(self.usr_dir, "src")
243
244     @property
245     def share_dir(self):
246         return os.path.join(self.usr_dir, "share")
247
248     @property
249     def exp_dir(self):
250         return os.path.join(self.home_dir, "nepi-exp")
251
252     @property
253     def exp_home(self):
254         return os.path.join(self.exp_dir, self.ec.exp_id)
255
256     @property
257     def node_home(self):
258         return os.path.join(self.exp_home, "node-%d" % self.guid)
259
260     @property
261     def run_home(self):
262         return os.path.join(self.node_home, self.ec.run_id)
263
264     @property
265     def os(self):
266         if self._os:
267             return self._os
268
269         if (not self.get("hostname") or not self.get("username")):
270             msg = "Can't resolve OS, insufficient data "
271             self.error(msg)
272             raise RuntimeError, msg
273
274         out = self.get_os()
275
276         if out.find("Fedora release 8") == 0:
277             self._os = OSType.FEDORA_8
278         elif out.find("Fedora release 12") == 0:
279             self._os = OSType.FEDORA_12
280         elif out.find("Fedora release 14") == 0:
281             self._os = OSType.FEDORA_14
282         elif out.find("Fedora release") == 0:
283             self._os = OSType.FEDORA
284         elif out.find("Debian") == 0: 
285             self._os = OSType.DEBIAN
286         elif out.find("Ubuntu") ==0:
287             self._os = OSType.UBUNTU
288         else:
289             msg = "Unsupported OS"
290             self.error(msg, out)
291             raise RuntimeError, "%s - %s " %( msg, out )
292
293         return self._os
294
295     def get_os(self):
296         # The underlying SSH layer will sometimes return an empty
297         # output (even if the command was executed without errors).
298         # To work arround this, repeat the operation N times or
299         # until the result is not empty string
300         out = ""
301         try:
302             (out, err), proc = self.execute("cat /etc/issue", 
303                     with_lock = True,
304                     blocking = True)
305         except:
306             trace = traceback.format_exc()
307             msg = "Error detecting OS: %s " % trace
308             self.error(msg, out, err)
309         
310         return out
311
312     @property
313     def use_deb(self):
314         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
315
316     @property
317     def use_rpm(self):
318         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
319                 OSType.FEDORA]
320
321     @property
322     def localhost(self):
323         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
324
325     def do_provision(self):
326         # check if host is alive
327         if not self.is_alive():
328             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
329             self.error(msg)
330             raise RuntimeError, msg
331
332         self.find_home()
333
334         if self.get("cleanProcesses"):
335             self.clean_processes()
336
337         if self.get("cleanHome"):
338             self.clean_home()
339  
340         if self.get("cleanExperiment"):
341             self.clean_experiment()
342     
343         # Create shared directory structure
344         self.mkdir(self.lib_dir)
345         self.mkdir(self.bin_dir)
346         self.mkdir(self.src_dir)
347         self.mkdir(self.share_dir)
348
349         # Create experiment node home directory
350         self.mkdir(self.node_home)
351
352         super(LinuxNode, self).do_provision()
353
354     def do_deploy(self):
355         if self.state == ResourceState.NEW:
356             self.info("Deploying node")
357             self.do_discover()
358             self.do_provision()
359
360         # Node needs to wait until all associated interfaces are 
361         # ready before it can finalize deployment
362         from nepi.resources.linux.interface import LinuxInterface
363         ifaces = self.get_connected(LinuxInterface.get_rtype())
364         for iface in ifaces:
365             if iface.state < ResourceState.READY:
366                 self.ec.schedule(reschedule_delay, self.deploy)
367                 return 
368
369         super(LinuxNode, self).do_deploy()
370
371     def do_release(self):
372         rms = self.get_connected()
373         for rm in rms:
374             # Node needs to wait until all associated RMs are released
375             # before it can be released
376             if rm.state != ResourceState.RELEASED:
377                 self.ec.schedule(reschedule_delay, self.release)
378                 return 
379
380         tear_down = self.get("tearDown")
381         if tear_down:
382             self.execute(tear_down)
383
384         self.clean_processes()
385
386         super(LinuxNode, self).do_release()
387
388     def valid_connection(self, guid):
389         # TODO: Validate!
390         return True
391
392     def clean_processes(self):
393         self.info("Cleaning up processes")
394         
395         cmd = ("sudo -S killall tcpdump || /bin/true ; " +
396             "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
397             "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
398
399         out = err = ""
400         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
401
402     def clean_home(self):
403         """ Cleans all NEPI related folders in the Linux host
404         """
405         self.info("Cleaning up home")
406         
407         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
408                 self.home_dir )
409
410         return self.execute(cmd, with_lock = True)
411
412     def clean_experiment(self):
413         """ Cleans all experiment related files in the Linux host.
414         It preserves NEPI files and folders that have a multi experiment
415         scope.
416         """
417         self.info("Cleaning up experiment files")
418         
419         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
420                 self.exp_dir,
421                 self.ec.exp_id )
422             
423         return self.execute(cmd, with_lock = True)
424
425     def execute(self, command,
426             sudo = False,
427             stdin = None, 
428             env = None,
429             tty = False,
430             forward_x11 = False,
431             timeout = None,
432             retry = 3,
433             err_on_timeout = True,
434             connect_timeout = 30,
435             strict_host_checking = False,
436             persistent = True,
437             blocking = True,
438             with_lock = False
439             ):
440         """ Notice that this invocation will block until the
441         execution finishes. If this is not the desired behavior,
442         use 'run' instead."""
443
444         if self.localhost:
445             (out, err), proc = execfuncs.lexec(command, 
446                     user = self.get("username"), # still problem with localhost
447                     sudo = sudo,
448                     stdin = stdin,
449                     env = env)
450         else:
451             if with_lock:
452                 with self._node_lock:
453                     (out, err), proc = sshfuncs.rexec(
454                         command, 
455                         host = self.get("hostname"),
456                         user = self.get("username"),
457                         port = self.get("port"),
458                         agent = True,
459                         sudo = sudo,
460                         stdin = stdin,
461                         identity = self.get("identity"),
462                         server_key = self.get("serverKey"),
463                         env = env,
464                         tty = tty,
465                         forward_x11 = forward_x11,
466                         timeout = timeout,
467                         retry = retry,
468                         err_on_timeout = err_on_timeout,
469                         connect_timeout = connect_timeout,
470                         persistent = persistent,
471                         blocking = blocking, 
472                         strict_host_checking = strict_host_checking
473                         )
474             else:
475                 (out, err), proc = sshfuncs.rexec(
476                     command, 
477                     host = self.get("hostname"),
478                     user = self.get("username"),
479                     port = self.get("port"),
480                     agent = True,
481                     sudo = sudo,
482                     stdin = stdin,
483                     identity = self.get("identity"),
484                     server_key = self.get("serverKey"),
485                     env = env,
486                     tty = tty,
487                     forward_x11 = forward_x11,
488                     timeout = timeout,
489                     retry = retry,
490                     err_on_timeout = err_on_timeout,
491                     connect_timeout = connect_timeout,
492                     persistent = persistent,
493                     blocking = blocking, 
494                     strict_host_checking = strict_host_checking
495                     )
496
497         return (out, err), proc
498
499     def run(self, command, home,
500             create_home = False,
501             pidfile = 'pidfile',
502             stdin = None, 
503             stdout = 'stdout', 
504             stderr = 'stderr', 
505             sudo = False,
506             tty = False):
507         
508         self.debug("Running command '%s'" % command)
509         
510         if self.localhost:
511             (out, err), proc = execfuncs.lspawn(command, pidfile, 
512                     stdout = stdout, 
513                     stderr = stderr, 
514                     stdin = stdin, 
515                     home = home, 
516                     create_home = create_home, 
517                     sudo = sudo,
518                     user = user) 
519         else:
520             with self._node_lock:
521                 (out, err), proc = sshfuncs.rspawn(
522                     command,
523                     pidfile = pidfile,
524                     home = home,
525                     create_home = create_home,
526                     stdin = stdin if stdin is not None else '/dev/null',
527                     stdout = stdout if stdout else '/dev/null',
528                     stderr = stderr if stderr else '/dev/null',
529                     sudo = sudo,
530                     host = self.get("hostname"),
531                     user = self.get("username"),
532                     port = self.get("port"),
533                     agent = True,
534                     identity = self.get("identity"),
535                     server_key = self.get("serverKey"),
536                     tty = tty
537                     )
538
539         return (out, err), proc
540
541     def getpid(self, home, pidfile = "pidfile"):
542         if self.localhost:
543             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
544         else:
545             with self._node_lock:
546                 pidtuple = sshfuncs.rgetpid(
547                     os.path.join(home, pidfile),
548                     host = self.get("hostname"),
549                     user = self.get("username"),
550                     port = self.get("port"),
551                     agent = True,
552                     identity = self.get("identity"),
553                     server_key = self.get("serverKey")
554                     )
555         
556         return pidtuple
557
558     def status(self, pid, ppid):
559         if self.localhost:
560             status = execfuncs.lstatus(pid, ppid)
561         else:
562             with self._node_lock:
563                 status = sshfuncs.rstatus(
564                         pid, ppid,
565                         host = self.get("hostname"),
566                         user = self.get("username"),
567                         port = self.get("port"),
568                         agent = True,
569                         identity = self.get("identity"),
570                         server_key = self.get("serverKey")
571                         )
572            
573         return status
574     
575     def kill(self, pid, ppid, sudo = False):
576         out = err = ""
577         proc = None
578         status = self.status(pid, ppid)
579
580         if status == sshfuncs.ProcStatus.RUNNING:
581             if self.localhost:
582                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
583             else:
584                 with self._node_lock:
585                     (out, err), proc = sshfuncs.rkill(
586                         pid, ppid,
587                         host = self.get("hostname"),
588                         user = self.get("username"),
589                         port = self.get("port"),
590                         agent = True,
591                         sudo = sudo,
592                         identity = self.get("identity"),
593                         server_key = self.get("serverKey")
594                         )
595
596         return (out, err), proc
597
598     def copy(self, src, dst):
599         if self.localhost:
600             (out, err), proc = execfuncs.lcopy(source, dest, 
601                     recursive = True,
602                     strict_host_checking = False)
603         else:
604             with self._node_lock:
605                 (out, err), proc = sshfuncs.rcopy(
606                     src, dst, 
607                     port = self.get("port"),
608                     identity = self.get("identity"),
609                     server_key = self.get("serverKey"),
610                     recursive = True,
611                     strict_host_checking = False)
612
613         return (out, err), proc
614
615     def upload(self, src, dst, text = False, overwrite = True):
616         """ Copy content to destination
617
618            src  content to copy. Can be a local file, directory or a list of files
619
620            dst  destination path on the remote host (remote is always self.host)
621
622            text src is text input, it must be stored into a temp file before uploading
623         """
624         # If source is a string input 
625         f = None
626         if text and not os.path.isfile(src):
627             # src is text input that should be uploaded as file
628             # create a temporal file with the content to upload
629             f = tempfile.NamedTemporaryFile(delete=False)
630             f.write(src)
631             f.close()
632             src = f.name
633
634         # If dst files should not be overwritten, check that the files do not
635         # exits already 
636         if overwrite == False:
637             src = self.filter_existing_files(src, dst)
638             if not src:
639                 return ("", ""), None 
640
641         if not self.localhost:
642             # Build destination as <user>@<server>:<path>
643             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
644
645         result = self.copy(src, dst)
646
647         # clean up temp file
648         if f:
649             os.remove(f.name)
650
651         return result
652
653     def download(self, src, dst):
654         if not self.localhost:
655             # Build destination as <user>@<server>:<path>
656             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
657         return self.copy(src, dst)
658
659     def install_packages_command(self, packages):
660         command = ""
661         if self.use_rpm:
662             command = rpmfuncs.install_packages_command(self.os, packages)
663         elif self.use_deb:
664             command = debfuncs.install_packages_command(self.os, packages)
665         else:
666             msg = "Error installing packages ( OS not known ) "
667             self.error(msg, self.os)
668             raise RuntimeError, msg
669
670         return command
671
672     def install_packages(self, packages, home, run_home = None):
673         """ Install packages in the Linux host.
674
675         'home' is the directory to upload the package installation script.
676         'run_home' is the directory from where to execute the script.
677         """
678         command = self.install_packages_command(packages)
679
680         run_home = run_home or home
681
682         (out, err), proc = self.run_and_wait(command, run_home, 
683             shfile = os.path.join(home, "instpkg.sh"),
684             pidfile = "instpkg_pidfile",
685             ecodefile = "instpkg_exitcode",
686             stdout = "instpkg_stdout", 
687             stderr = "instpkg_stderr",
688             overwrite = False,
689             raise_on_error = True)
690
691         return (out, err), proc 
692
693     def remove_packages(self, packages, home, run_home = None):
694         """ Uninstall packages from the Linux host.
695
696         'home' is the directory to upload the package un-installation script.
697         'run_home' is the directory from where to execute the script.
698         """
699         if self.use_rpm:
700             command = rpmfuncs.remove_packages_command(self.os, packages)
701         elif self.use_deb:
702             command = debfuncs.remove_packages_command(self.os, packages)
703         else:
704             msg = "Error removing packages ( OS not known ) "
705             self.error(msg)
706             raise RuntimeError, msg
707
708         run_home = run_home or home
709
710         (out, err), proc = self.run_and_wait(command, run_home, 
711             shfile = os.path.join(home, "rmpkg.sh"),
712             pidfile = "rmpkg_pidfile",
713             ecodefile = "rmpkg_exitcode",
714             stdout = "rmpkg_stdout", 
715             stderr = "rmpkg_stderr",
716             overwrite = False,
717             raise_on_error = True)
718          
719         return (out, err), proc 
720
721     def mkdir(self, path, clean = False):
722         if clean:
723             self.rmdir(path)
724
725         return self.execute("mkdir -p %s" % path, with_lock = True)
726
727     def rmdir(self, path):
728         return self.execute("rm -rf %s" % path, with_lock = True)
729         
730     def run_and_wait(self, command, home, 
731             shfile = "cmd.sh",
732             env = None,
733             overwrite = True,
734             pidfile = "pidfile", 
735             ecodefile = "exitcode", 
736             stdin = None, 
737             stdout = "stdout", 
738             stderr = "stderr", 
739             sudo = False,
740             tty = False,
741             raise_on_error = False):
742         """
743         Uploads the 'command' to a bash script in the host.
744         Then runs the script detached in background in the host, and
745         busy-waites until the script finishes executing.
746         """
747
748         if not shfile.startswith("/"):
749             shfile = os.path.join(home, shfile)
750
751         self.upload_command(command, 
752             shfile = shfile, 
753             ecodefile = ecodefile, 
754             env = env,
755             overwrite = overwrite)
756
757         command = "bash %s" % shfile
758         # run command in background in remote host
759         (out, err), proc = self.run(command, home, 
760                 pidfile = pidfile,
761                 stdin = stdin, 
762                 stdout = stdout, 
763                 stderr = stderr, 
764                 sudo = sudo,
765                 tty = tty)
766
767         # check no errors occurred
768         if proc.poll():
769             msg = " Failed to run command '%s' " % command
770             self.error(msg, out, err)
771             if raise_on_error:
772                 raise RuntimeError, msg
773
774         # Wait for pid file to be generated
775         pid, ppid = self.wait_pid(
776                 home = home, 
777                 pidfile = pidfile, 
778                 raise_on_error = raise_on_error)
779
780         # wait until command finishes to execute
781         self.wait_run(pid, ppid)
782       
783         (eout, err), proc = self.check_errors(home,
784             ecodefile = ecodefile,
785             stderr = stderr)
786
787         # Out is what was written in the stderr file
788         if err:
789             msg = " Failed to run command '%s' " % command
790             self.error(msg, eout, err)
791
792             if raise_on_error:
793                 raise RuntimeError, msg
794
795         (out, oerr), proc = self.check_output(home, stdout)
796         
797         return (out, err), proc
798
799     def exitcode(self, home, ecodefile = "exitcode"):
800         """
801         Get the exit code of an application.
802         Returns an integer value with the exit code 
803         """
804         (out, err), proc = self.check_output(home, ecodefile)
805
806         # Succeeded to open file, return exit code in the file
807         if proc.wait() == 0:
808             try:
809                 return int(out.strip())
810             except:
811                 # Error in the content of the file!
812                 return ExitCode.CORRUPTFILE
813
814         # No such file or directory
815         if proc.returncode == 1:
816             return ExitCode.FILENOTFOUND
817         
818         # Other error from 'cat'
819         return ExitCode.ERROR
820
821     def upload_command(self, command, 
822             shfile = "cmd.sh",
823             ecodefile = "exitcode",
824             overwrite = True,
825             env = None):
826         """ Saves the command as a bash script file in the remote host, and
827         forces to save the exit code of the command execution to the ecodefile
828         """
829
830         if not (command.strip().endswith(";") or command.strip().endswith("&")):
831             command += ";"
832       
833         # The exit code of the command will be stored in ecodefile
834         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
835                 'command': command,
836                 'ecodefile': ecodefile,
837                 } 
838
839         # Export environment
840         environ = self.format_environment(env)
841
842         # Add environ to command
843         command = environ + command
844
845         return self.upload(command, shfile, text = True, overwrite = overwrite)
846
847     def format_environment(self, env, inline = False):
848         """ Formats the environment variables for a command to be executed
849         either as an inline command
850         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
851         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
852         """
853         if not env: return ""
854
855         # Remove extra white spaces
856         env = re.sub(r'\s+', ' ', env.strip())
857
858         sep = ";" if inline else "\n"
859         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
860
861     def check_errors(self, home, 
862             ecodefile = "exitcode", 
863             stderr = "stderr"):
864         """ Checks whether errors occurred while running a command.
865         It first checks the exit code for the command, and only if the
866         exit code is an error one it returns the error output.
867
868         """
869         proc = None
870         err = ""
871
872         # get exit code saved in the 'exitcode' file
873         ecode = self.exitcode(home, ecodefile)
874
875         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
876             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
877         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
878             # The process returned an error code or didn't exist. 
879             # Check standard error.
880             (err, eerr), proc = self.check_output(home, stderr)
881
882             # If the stderr file was not found, assume nothing bad happened,
883             # and just ignore the error.
884             # (cat returns 1 for error "No such file or directory")
885             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
886                 err = "" 
887             
888         return ("", err), proc
889  
890     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
891         """ Waits until the pid file for the command is generated, 
892             and returns the pid and ppid of the process """
893         pid = ppid = None
894         delay = 1.0
895
896         for i in xrange(2):
897             pidtuple = self.getpid(home = home, pidfile = pidfile)
898             
899             if pidtuple:
900                 pid, ppid = pidtuple
901                 break
902             else:
903                 time.sleep(delay)
904                 delay = delay * 1.5
905         else:
906             msg = " Failed to get pid for pidfile %s/%s " % (
907                     home, pidfile )
908             self.error(msg)
909             
910             if raise_on_error:
911                 raise RuntimeError, msg
912
913         return pid, ppid
914
915     def wait_run(self, pid, ppid, trial = 0):
916         """ wait for a remote process to finish execution """
917         delay = 1.0
918
919         while True:
920             status = self.status(pid, ppid)
921             
922             if status is ProcStatus.FINISHED:
923                 break
924             elif status is not ProcStatus.RUNNING:
925                 delay = delay * 1.5
926                 time.sleep(delay)
927                 # If it takes more than 20 seconds to start, then
928                 # asume something went wrong
929                 if delay > 20:
930                     break
931             else:
932                 # The app is running, just wait...
933                 time.sleep(0.5)
934
935     def check_output(self, home, filename):
936         """ Retrives content of file """
937         (out, err), proc = self.execute("cat %s" % 
938             os.path.join(home, filename), retry = 1, with_lock = True)
939         return (out, err), proc
940
941     def is_alive(self):
942         """ Checks if host is responsive
943         """
944         if self.localhost:
945             return True
946
947         out = err = ""
948         msg = "Unresponsive host. Wrong answer. "
949
950         # The underlying SSH layer will sometimes return an empty
951         # output (even if the command was executed without errors).
952         # To work arround this, repeat the operation N times or
953         # until the result is not empty string
954         try:
955             (out, err), proc = self.execute("echo 'ALIVE'",
956                     blocking = True,
957                     with_lock = True)
958     
959             if out.find("ALIVE") > -1:
960                 return True
961         except:
962             trace = traceback.format_exc()
963             msg = "Unresponsive host. Error reaching host: %s " % trace
964
965         self.error(msg, out, err)
966         return False
967
968     def find_home(self):
969         """ Retrieves host home directory
970         """
971         # The underlying SSH layer will sometimes return an empty
972         # output (even if the command was executed without errors).
973         # To work arround this, repeat the operation N times or
974         # until the result is not empty string
975         msg = "Impossible to retrieve HOME directory"
976         try:
977             (out, err), proc = self.execute("echo ${HOME}",
978                     blocking = True,
979                     with_lock = True)
980     
981             if out.strip() != "":
982                 self._home_dir =  out.strip()
983         except:
984             trace = traceback.format_exc()
985             msg = "Impossible to retrieve HOME directory %s" % trace
986
987         if not self._home_dir:
988             self.error(msg)
989             raise RuntimeError, msg
990
991     def filter_existing_files(self, src, dst):
992         """ Removes files that already exist in the Linux host from src list
993         """
994         # construct a dictionary with { dst: src }
995         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
996             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
997
998         command = []
999         for d in dests.keys():
1000             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1001
1002         command = ";".join(command)
1003
1004         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1005     
1006         for d in dests.keys():
1007             if out.find(d) > -1:
1008                 del dests[d]
1009
1010         if not dests:
1011             return ""
1012
1013         return " ".join(dests.values())
1014