Adding gateway attr and modify CleanProcess
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.ExecReadOnly)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.ExecReadOnly)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.ExecReadOnly)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.ExecReadOnly)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.ExecReadOnly)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.ExecReadOnly)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.ExecReadOnly)
190
191         gateway_user = Attribute("gatewayUser", "Gateway account username",
192                 flags = Flags.ExecReadOnly)
193
194         gateway = Attribute("gateway", "Hostname of the gateway machine",
195                 flags = Flags.ExecReadOnly)
196
197         cls._register_attribute(hostname)
198         cls._register_attribute(username)
199         cls._register_attribute(port)
200         cls._register_attribute(home)
201         cls._register_attribute(identity)
202         cls._register_attribute(server_key)
203         cls._register_attribute(clean_home)
204         cls._register_attribute(clean_experiment)
205         cls._register_attribute(clean_processes)
206         cls._register_attribute(tear_down)
207         cls._register_attribute(gateway_user)
208         cls._register_attribute(gateway)
209
210     def __init__(self, ec, guid):
211         super(LinuxNode, self).__init__(ec, guid)
212         self._os = None
213         # home directory at Linux host
214         self._home_dir = ""
215
216         # list of pids before running the app if the user is root
217         self._pids = []
218         
219         # lock to prevent concurrent applications on the same node,
220         # to execute commands at the same time. There are potential
221         # concurrency issues when using SSH to a same host from 
222         # multiple threads. There are also possible operational 
223         # issues, e.g. an application querying the existence 
224         # of a file or folder prior to its creation, and another 
225         # application creating the same file or folder in between.
226         self._node_lock = threading.Lock()
227     
228     def log_message(self, msg):
229         return " guid %d - host %s - %s " % (self.guid, 
230                 self.get("hostname"), msg)
231
232     @property
233     def home_dir(self):
234         home = self.get("home") or ""
235         if not home.startswith("/"):
236            home = os.path.join(self._home_dir, home) 
237         return home
238
239     @property
240     def usr_dir(self):
241         return os.path.join(self.home_dir, "nepi-usr")
242
243     @property
244     def lib_dir(self):
245         return os.path.join(self.usr_dir, "lib")
246
247     @property
248     def bin_dir(self):
249         return os.path.join(self.usr_dir, "bin")
250
251     @property
252     def src_dir(self):
253         return os.path.join(self.usr_dir, "src")
254
255     @property
256     def share_dir(self):
257         return os.path.join(self.usr_dir, "share")
258
259     @property
260     def exp_dir(self):
261         return os.path.join(self.home_dir, "nepi-exp")
262
263     @property
264     def exp_home(self):
265         return os.path.join(self.exp_dir, self.ec.exp_id)
266
267     @property
268     def node_home(self):
269         return os.path.join(self.exp_home, "node-%d" % self.guid)
270
271     @property
272     def run_home(self):
273         return os.path.join(self.node_home, self.ec.run_id)
274
275     @property
276     def os(self):
277         if self._os:
278             return self._os
279
280         if (not self.get("hostname") or not self.get("username")):
281             msg = "Can't resolve OS, insufficient data "
282             self.error(msg)
283             raise RuntimeError, msg
284
285         out = self.get_os()
286
287         if out.find("Fedora release 8") == 0:
288             self._os = OSType.FEDORA_8
289         elif out.find("Fedora release 12") == 0:
290             self._os = OSType.FEDORA_12
291         elif out.find("Fedora release 14") == 0:
292             self._os = OSType.FEDORA_14
293         elif out.find("Fedora release") == 0:
294             self._os = OSType.FEDORA
295         elif out.find("Debian") == 0: 
296             self._os = OSType.DEBIAN
297         elif out.find("Ubuntu") ==0:
298             self._os = OSType.UBUNTU
299         else:
300             msg = "Unsupported OS"
301             self.error(msg, out)
302             raise RuntimeError, "%s - %s " %( msg, out )
303
304         return self._os
305
306     def get_os(self):
307         # The underlying SSH layer will sometimes return an empty
308         # output (even if the command was executed without errors).
309         # To work arround this, repeat the operation N times or
310         # until the result is not empty string
311         out = ""
312         try:
313             (out, err), proc = self.execute("cat /etc/issue", 
314                     with_lock = True,
315                     blocking = True)
316         except:
317             trace = traceback.format_exc()
318             msg = "Error detecting OS: %s " % trace
319             self.error(msg, out, err)
320         
321         return out
322
323     @property
324     def use_deb(self):
325         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
326
327     @property
328     def use_rpm(self):
329         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
330                 OSType.FEDORA]
331
332     @property
333     def localhost(self):
334         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
335
336     def do_provision(self):
337         # check if host is alive
338         if not self.is_alive():
339             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
340             self.error(msg)
341             raise RuntimeError, msg
342
343         self.find_home()
344
345         if self.get("cleanProcesses"):
346             self.clean_processes()
347
348         if self.get("cleanHome"):
349             self.clean_home()
350  
351         if self.get("cleanExperiment"):
352             self.clean_experiment()
353     
354         # Create shared directory structure
355         self.mkdir(self.lib_dir)
356         self.mkdir(self.bin_dir)
357         self.mkdir(self.src_dir)
358         self.mkdir(self.share_dir)
359
360         # Create experiment node home directory
361         self.mkdir(self.node_home)
362
363         super(LinuxNode, self).do_provision()
364
365     def do_deploy(self):
366         if self.state == ResourceState.NEW:
367             self.info("Deploying node")
368             self.do_discover()
369             self.do_provision()
370
371         # Node needs to wait until all associated interfaces are 
372         # ready before it can finalize deployment
373         from nepi.resources.linux.interface import LinuxInterface
374         ifaces = self.get_connected(LinuxInterface.get_rtype())
375         for iface in ifaces:
376             if iface.state < ResourceState.READY:
377                 self.ec.schedule(reschedule_delay, self.deploy)
378                 return 
379
380         super(LinuxNode, self).do_deploy()
381
382     def do_release(self):
383         rms = self.get_connected()
384         for rm in rms:
385             # Node needs to wait until all associated RMs are released
386             # before it can be released
387             if rm.state != ResourceState.RELEASED:
388                 self.ec.schedule(reschedule_delay, self.release)
389                 return 
390
391         tear_down = self.get("tearDown")
392         if tear_down:
393             self.execute(tear_down)
394
395         self.clean_processes()
396
397         super(LinuxNode, self).do_release()
398
399     def valid_connection(self, guid):
400         # TODO: Validate!
401         return True
402
403     def clean_processes(self):
404         self.info("Cleaning up processes")
405  
406         if self.get("username") != 'root':
407             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
408                 "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
409                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
410         else:
411             pids_temp = []
412             if self.state >= ResourceState.READY:
413                 ps_aux = "ps aux |awk '{print $2}' |sort -u"
414                 (out, err), proc = self.execute(ps_aux)
415                 pids_temp = out.split()
416                 kill_pids = list(set(pids_temp) - set(self._pids))
417                 kill_pids = ' '.join(kill_pids)
418
419                 cmd = ("killall tcpdump || /bin/true ; " +
420                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
421                     "kill %s || /bin/true ; " % kill_pids)
422             else:
423                 cmd = ("killall tcpdump || /bin/true ; " +
424                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
425
426         out = err = ""
427         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
428
429     def clean_home(self):
430         """ Cleans all NEPI related folders in the Linux host
431         """
432         self.info("Cleaning up home")
433         
434         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
435                 self.home_dir )
436
437         return self.execute(cmd, with_lock = True)
438
439     def clean_experiment(self):
440         """ Cleans all experiment related files in the Linux host.
441         It preserves NEPI files and folders that have a multi experiment
442         scope.
443         """
444         self.info("Cleaning up experiment files")
445         
446         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
447                 self.exp_dir,
448                 self.ec.exp_id )
449             
450         return self.execute(cmd, with_lock = True)
451
452     def execute(self, command,
453             sudo = False,
454             stdin = None, 
455             env = None,
456             tty = False,
457             forward_x11 = False,
458             timeout = None,
459             retry = 3,
460             err_on_timeout = True,
461             connect_timeout = 30,
462             strict_host_checking = False,
463             persistent = True,
464             blocking = True,
465             with_lock = False
466             ):
467         """ Notice that this invocation will block until the
468         execution finishes. If this is not the desired behavior,
469         use 'run' instead."""
470
471         if self.localhost:
472             (out, err), proc = execfuncs.lexec(command, 
473                     user = self.get("username"), # still problem with localhost
474                     sudo = sudo,
475                     stdin = stdin,
476                     env = env)
477         else:
478             if with_lock:
479                 with self._node_lock:
480                     (out, err), proc = sshfuncs.rexec(
481                         command, 
482                         host = self.get("hostname"),
483                         user = self.get("username"),
484                         port = self.get("port"),
485                         gwuser = self.get("gatewayUser"),
486                         gw = self.get("gateway"),
487                         agent = True,
488                         sudo = sudo,
489                         stdin = stdin,
490                         identity = self.get("identity"),
491                         server_key = self.get("serverKey"),
492                         env = env,
493                         tty = tty,
494                         forward_x11 = forward_x11,
495                         timeout = timeout,
496                         retry = retry,
497                         err_on_timeout = err_on_timeout,
498                         connect_timeout = connect_timeout,
499                         persistent = persistent,
500                         blocking = blocking, 
501                         strict_host_checking = strict_host_checking
502                         )
503             else:
504                 (out, err), proc = sshfuncs.rexec(
505                     command, 
506                     host = self.get("hostname"),
507                     user = self.get("username"),
508                     port = self.get("port"),
509                     gwuser = self.get("gatewayUser"),
510                     gw = self.get("gateway"),
511                     agent = True,
512                     sudo = sudo,
513                     stdin = stdin,
514                     identity = self.get("identity"),
515                     server_key = self.get("serverKey"),
516                     env = env,
517                     tty = tty,
518                     forward_x11 = forward_x11,
519                     timeout = timeout,
520                     retry = retry,
521                     err_on_timeout = err_on_timeout,
522                     connect_timeout = connect_timeout,
523                     persistent = persistent,
524                     blocking = blocking, 
525                     strict_host_checking = strict_host_checking
526                     )
527
528         return (out, err), proc
529
530     def run(self, command, home,
531             create_home = False,
532             pidfile = 'pidfile',
533             stdin = None, 
534             stdout = 'stdout', 
535             stderr = 'stderr', 
536             sudo = False,
537             tty = False):
538         
539         self.debug("Running command '%s'" % command)
540         
541         if self.localhost:
542             (out, err), proc = execfuncs.lspawn(command, pidfile, 
543                     stdout = stdout, 
544                     stderr = stderr, 
545                     stdin = stdin, 
546                     home = home, 
547                     create_home = create_home, 
548                     sudo = sudo,
549                     user = user) 
550         else:
551             with self._node_lock:
552                 (out, err), proc = sshfuncs.rspawn(
553                     command,
554                     pidfile = pidfile,
555                     home = home,
556                     create_home = create_home,
557                     stdin = stdin if stdin is not None else '/dev/null',
558                     stdout = stdout if stdout else '/dev/null',
559                     stderr = stderr if stderr else '/dev/null',
560                     sudo = sudo,
561                     host = self.get("hostname"),
562                     user = self.get("username"),
563                     port = self.get("port"),
564                     gwuser = self.get("gatewayUser"),
565                     gw = self.get("gateway"),
566                     agent = True,
567                     identity = self.get("identity"),
568                     server_key = self.get("serverKey"),
569                     tty = tty
570                     )
571
572         return (out, err), proc
573
574     def getpid(self, home, pidfile = "pidfile"):
575         if self.localhost:
576             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
577         else:
578             with self._node_lock:
579                 pidtuple = sshfuncs.rgetpid(
580                     os.path.join(home, pidfile),
581                     host = self.get("hostname"),
582                     user = self.get("username"),
583                     port = self.get("port"),
584                     gwuser = self.get("gatewayUser"),
585                     gw = self.get("gateway"),
586                     agent = True,
587                     identity = self.get("identity"),
588                     server_key = self.get("serverKey")
589                     )
590         
591         return pidtuple
592
593     def status(self, pid, ppid):
594         if self.localhost:
595             status = execfuncs.lstatus(pid, ppid)
596         else:
597             with self._node_lock:
598                 status = sshfuncs.rstatus(
599                         pid, ppid,
600                         host = self.get("hostname"),
601                         user = self.get("username"),
602                         port = self.get("port"),
603                         gwuser = self.get("gatewayUser"),
604                         gw = self.get("gateway"),
605                         agent = True,
606                         identity = self.get("identity"),
607                         server_key = self.get("serverKey")
608                         )
609            
610         return status
611     
612     def kill(self, pid, ppid, sudo = False):
613         out = err = ""
614         proc = None
615         status = self.status(pid, ppid)
616
617         if status == sshfuncs.ProcStatus.RUNNING:
618             if self.localhost:
619                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
620             else:
621                 with self._node_lock:
622                     (out, err), proc = sshfuncs.rkill(
623                         pid, ppid,
624                         host = self.get("hostname"),
625                         user = self.get("username"),
626                         port = self.get("port"),
627                         gwuser = self.get("gatewayUser"),
628                         gw = self.get("gateway"),
629                         agent = True,
630                         sudo = sudo,
631                         identity = self.get("identity"),
632                         server_key = self.get("serverKey")
633                         )
634
635         return (out, err), proc
636
637     def copy(self, src, dst):
638         if self.localhost:
639             (out, err), proc = execfuncs.lcopy(source, dest, 
640                     recursive = True,
641                     strict_host_checking = False)
642         else:
643             with self._node_lock:
644                 (out, err), proc = sshfuncs.rcopy(
645                     src, dst, 
646                     port = self.get("port"),
647                     gwuser = self.get("gatewayUser"),
648                     gw = self.get("gateway"),
649                     identity = self.get("identity"),
650                     server_key = self.get("serverKey"),
651                     recursive = True,
652                     strict_host_checking = False)
653
654         return (out, err), proc
655
656     def upload(self, src, dst, text = False, overwrite = True):
657         """ Copy content to destination
658
659            src  content to copy. Can be a local file, directory or a list of files
660
661            dst  destination path on the remote host (remote is always self.host)
662
663            text src is text input, it must be stored into a temp file before uploading
664         """
665         # If source is a string input 
666         f = None
667         if text and not os.path.isfile(src):
668             # src is text input that should be uploaded as file
669             # create a temporal file with the content to upload
670             f = tempfile.NamedTemporaryFile(delete=False)
671             f.write(src)
672             f.close()
673             src = f.name
674
675         # If dst files should not be overwritten, check that the files do not
676         # exits already 
677         if overwrite == False:
678             src = self.filter_existing_files(src, dst)
679             if not src:
680                 return ("", ""), None 
681
682         if not self.localhost:
683             # Build destination as <user>@<server>:<path>
684             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
685
686         result = self.copy(src, dst)
687
688         # clean up temp file
689         if f:
690             os.remove(f.name)
691
692         return result
693
694     def download(self, src, dst):
695         if not self.localhost:
696             # Build destination as <user>@<server>:<path>
697             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
698         return self.copy(src, dst)
699
700     def install_packages_command(self, packages):
701         command = ""
702         if self.use_rpm:
703             command = rpmfuncs.install_packages_command(self.os, packages)
704         elif self.use_deb:
705             command = debfuncs.install_packages_command(self.os, packages)
706         else:
707             msg = "Error installing packages ( OS not known ) "
708             self.error(msg, self.os)
709             raise RuntimeError, msg
710
711         return command
712
713     def install_packages(self, packages, home, run_home = None):
714         """ Install packages in the Linux host.
715
716         'home' is the directory to upload the package installation script.
717         'run_home' is the directory from where to execute the script.
718         """
719         command = self.install_packages_command(packages)
720
721         run_home = run_home or home
722
723         (out, err), proc = self.run_and_wait(command, run_home, 
724             shfile = os.path.join(home, "instpkg.sh"),
725             pidfile = "instpkg_pidfile",
726             ecodefile = "instpkg_exitcode",
727             stdout = "instpkg_stdout", 
728             stderr = "instpkg_stderr",
729             overwrite = False,
730             raise_on_error = True)
731
732         return (out, err), proc 
733
734     def remove_packages(self, packages, home, run_home = None):
735         """ Uninstall packages from the Linux host.
736
737         'home' is the directory to upload the package un-installation script.
738         'run_home' is the directory from where to execute the script.
739         """
740         if self.use_rpm:
741             command = rpmfuncs.remove_packages_command(self.os, packages)
742         elif self.use_deb:
743             command = debfuncs.remove_packages_command(self.os, packages)
744         else:
745             msg = "Error removing packages ( OS not known ) "
746             self.error(msg)
747             raise RuntimeError, msg
748
749         run_home = run_home or home
750
751         (out, err), proc = self.run_and_wait(command, run_home, 
752             shfile = os.path.join(home, "rmpkg.sh"),
753             pidfile = "rmpkg_pidfile",
754             ecodefile = "rmpkg_exitcode",
755             stdout = "rmpkg_stdout", 
756             stderr = "rmpkg_stderr",
757             overwrite = False,
758             raise_on_error = True)
759          
760         return (out, err), proc 
761
762     def mkdir(self, path, clean = False):
763         if clean:
764             self.rmdir(path)
765
766         return self.execute("mkdir -p %s" % path, with_lock = True)
767
768     def rmdir(self, path):
769         return self.execute("rm -rf %s" % path, with_lock = True)
770         
771     def run_and_wait(self, command, home, 
772             shfile = "cmd.sh",
773             env = None,
774             overwrite = True,
775             pidfile = "pidfile", 
776             ecodefile = "exitcode", 
777             stdin = None, 
778             stdout = "stdout", 
779             stderr = "stderr", 
780             sudo = False,
781             tty = False,
782             raise_on_error = False):
783         """
784         Uploads the 'command' to a bash script in the host.
785         Then runs the script detached in background in the host, and
786         busy-waites until the script finishes executing.
787         """
788
789         if not shfile.startswith("/"):
790             shfile = os.path.join(home, shfile)
791
792         self.upload_command(command, 
793             shfile = shfile, 
794             ecodefile = ecodefile, 
795             env = env,
796             overwrite = overwrite)
797
798         command = "bash %s" % shfile
799         # run command in background in remote host
800         (out, err), proc = self.run(command, home, 
801                 pidfile = pidfile,
802                 stdin = stdin, 
803                 stdout = stdout, 
804                 stderr = stderr, 
805                 sudo = sudo,
806                 tty = tty)
807
808         # check no errors occurred
809         if proc.poll():
810             msg = " Failed to run command '%s' " % command
811             self.error(msg, out, err)
812             if raise_on_error:
813                 raise RuntimeError, msg
814
815         # Wait for pid file to be generated
816         pid, ppid = self.wait_pid(
817                 home = home, 
818                 pidfile = pidfile, 
819                 raise_on_error = raise_on_error)
820
821         # wait until command finishes to execute
822         self.wait_run(pid, ppid)
823       
824         (eout, err), proc = self.check_errors(home,
825             ecodefile = ecodefile,
826             stderr = stderr)
827
828         # Out is what was written in the stderr file
829         if err:
830             msg = " Failed to run command '%s' " % command
831             self.error(msg, eout, err)
832
833             if raise_on_error:
834                 raise RuntimeError, msg
835
836         (out, oerr), proc = self.check_output(home, stdout)
837         
838         return (out, err), proc
839
840     def exitcode(self, home, ecodefile = "exitcode"):
841         """
842         Get the exit code of an application.
843         Returns an integer value with the exit code 
844         """
845         (out, err), proc = self.check_output(home, ecodefile)
846
847         # Succeeded to open file, return exit code in the file
848         if proc.wait() == 0:
849             try:
850                 return int(out.strip())
851             except:
852                 # Error in the content of the file!
853                 return ExitCode.CORRUPTFILE
854
855         # No such file or directory
856         if proc.returncode == 1:
857             return ExitCode.FILENOTFOUND
858         
859         # Other error from 'cat'
860         return ExitCode.ERROR
861
862     def upload_command(self, command, 
863             shfile = "cmd.sh",
864             ecodefile = "exitcode",
865             overwrite = True,
866             env = None):
867         """ Saves the command as a bash script file in the remote host, and
868         forces to save the exit code of the command execution to the ecodefile
869         """
870
871         if not (command.strip().endswith(";") or command.strip().endswith("&")):
872             command += ";"
873       
874         # The exit code of the command will be stored in ecodefile
875         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
876                 'command': command,
877                 'ecodefile': ecodefile,
878                 } 
879
880         # Export environment
881         environ = self.format_environment(env)
882
883         # Add environ to command
884         command = environ + command
885
886         return self.upload(command, shfile, text = True, overwrite = overwrite)
887
888     def format_environment(self, env, inline = False):
889         """ Formats the environment variables for a command to be executed
890         either as an inline command
891         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
892         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
893         """
894         if not env: return ""
895
896         # Remove extra white spaces
897         env = re.sub(r'\s+', ' ', env.strip())
898
899         sep = ";" if inline else "\n"
900         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
901
902     def check_errors(self, home, 
903             ecodefile = "exitcode", 
904             stderr = "stderr"):
905         """ Checks whether errors occurred while running a command.
906         It first checks the exit code for the command, and only if the
907         exit code is an error one it returns the error output.
908
909         """
910         proc = None
911         err = ""
912
913         # get exit code saved in the 'exitcode' file
914         ecode = self.exitcode(home, ecodefile)
915
916         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
917             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
918         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
919             # The process returned an error code or didn't exist. 
920             # Check standard error.
921             (err, eerr), proc = self.check_output(home, stderr)
922
923             # If the stderr file was not found, assume nothing bad happened,
924             # and just ignore the error.
925             # (cat returns 1 for error "No such file or directory")
926             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
927                 err = "" 
928             
929         return ("", err), proc
930  
931     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
932         """ Waits until the pid file for the command is generated, 
933             and returns the pid and ppid of the process """
934         pid = ppid = None
935         delay = 1.0
936
937         for i in xrange(2):
938             pidtuple = self.getpid(home = home, pidfile = pidfile)
939             
940             if pidtuple:
941                 pid, ppid = pidtuple
942                 break
943             else:
944                 time.sleep(delay)
945                 delay = delay * 1.5
946         else:
947             msg = " Failed to get pid for pidfile %s/%s " % (
948                     home, pidfile )
949             self.error(msg)
950             
951             if raise_on_error:
952                 raise RuntimeError, msg
953
954         return pid, ppid
955
956     def wait_run(self, pid, ppid, trial = 0):
957         """ wait for a remote process to finish execution """
958         delay = 1.0
959
960         while True:
961             status = self.status(pid, ppid)
962             
963             if status is ProcStatus.FINISHED:
964                 break
965             elif status is not ProcStatus.RUNNING:
966                 delay = delay * 1.5
967                 time.sleep(delay)
968                 # If it takes more than 20 seconds to start, then
969                 # asume something went wrong
970                 if delay > 20:
971                     break
972             else:
973                 # The app is running, just wait...
974                 time.sleep(0.5)
975
976     def check_output(self, home, filename):
977         """ Retrives content of file """
978         (out, err), proc = self.execute("cat %s" % 
979             os.path.join(home, filename), retry = 1, with_lock = True)
980         return (out, err), proc
981
982     def is_alive(self):
983         """ Checks if host is responsive
984         """
985         if self.localhost:
986             return True
987
988         out = err = ""
989         msg = "Unresponsive host. Wrong answer. "
990
991         # The underlying SSH layer will sometimes return an empty
992         # output (even if the command was executed without errors).
993         # To work arround this, repeat the operation N times or
994         # until the result is not empty string
995         try:
996             (out, err), proc = self.execute("echo 'ALIVE'",
997                     blocking = True,
998                     with_lock = True)
999     
1000             if out.find("ALIVE") > -1:
1001                 return True
1002         except:
1003             trace = traceback.format_exc()
1004             msg = "Unresponsive host. Error reaching host: %s " % trace
1005
1006         self.error(msg, out, err)
1007         return False
1008
1009     def find_home(self):
1010         """ Retrieves host home directory
1011         """
1012         # The underlying SSH layer will sometimes return an empty
1013         # output (even if the command was executed without errors).
1014         # To work arround this, repeat the operation N times or
1015         # until the result is not empty string
1016         msg = "Impossible to retrieve HOME directory"
1017         try:
1018             (out, err), proc = self.execute("echo ${HOME}",
1019                     blocking = True,
1020                     with_lock = True)
1021     
1022             if out.strip() != "":
1023                 self._home_dir =  out.strip()
1024         except:
1025             trace = traceback.format_exc()
1026             msg = "Impossible to retrieve HOME directory %s" % trace
1027
1028         if not self._home_dir:
1029             self.error(msg)
1030             raise RuntimeError, msg
1031
1032     def filter_existing_files(self, src, dst):
1033         """ Removes files that already exist in the Linux host from src list
1034         """
1035         # construct a dictionary with { dst: src }
1036         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
1037             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1038
1039         command = []
1040         for d in dests.keys():
1041             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1042
1043         command = ";".join(command)
1044
1045         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1046     
1047         for d in dests.keys():
1048             if out.find(d) > -1:
1049                 del dests[d]
1050
1051         if not dests:
1052             return ""
1053
1054         return " ".join(dests.values())
1055