Update method CleanProcess for linux node when user is root
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.ExecReadOnly)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.ExecReadOnly)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.ExecReadOnly)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.ExecReadOnly)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.ExecReadOnly)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.ExecReadOnly)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.ExecReadOnly)
190
191         gateway_user = Attribute("gatewayUser", "Gateway account username",
192                 flags = Flags.ExecReadOnly)
193
194         gateway = Attribute("gateway", "Hostname of the gateway machine",
195                 flags = Flags.ExecReadOnly)
196
197         cls._register_attribute(hostname)
198         cls._register_attribute(username)
199         cls._register_attribute(port)
200         cls._register_attribute(home)
201         cls._register_attribute(identity)
202         cls._register_attribute(server_key)
203         cls._register_attribute(clean_home)
204         cls._register_attribute(clean_experiment)
205         cls._register_attribute(clean_processes)
206         cls._register_attribute(tear_down)
207         cls._register_attribute(gateway_user)
208         cls._register_attribute(gateway)
209
210     def __init__(self, ec, guid):
211         super(LinuxNode, self).__init__(ec, guid)
212         self._os = None
213         # home directory at Linux host
214         self._home_dir = ""
215
216         # lock to prevent concurrent applications on the same node,
217         # to execute commands at the same time. There are potential
218         # concurrency issues when using SSH to a same host from 
219         # multiple threads. There are also possible operational 
220         # issues, e.g. an application querying the existence 
221         # of a file or folder prior to its creation, and another 
222         # application creating the same file or folder in between.
223         self._node_lock = threading.Lock()
224     
225     def log_message(self, msg):
226         return " guid %d - host %s - %s " % (self.guid, 
227                 self.get("hostname"), msg)
228
229     @property
230     def home_dir(self):
231         home = self.get("home") or ""
232         if not home.startswith("/"):
233            home = os.path.join(self._home_dir, home) 
234         return home
235
236     @property
237     def usr_dir(self):
238         return os.path.join(self.home_dir, "nepi-usr")
239
240     @property
241     def lib_dir(self):
242         return os.path.join(self.usr_dir, "lib")
243
244     @property
245     def bin_dir(self):
246         return os.path.join(self.usr_dir, "bin")
247
248     @property
249     def src_dir(self):
250         return os.path.join(self.usr_dir, "src")
251
252     @property
253     def share_dir(self):
254         return os.path.join(self.usr_dir, "share")
255
256     @property
257     def exp_dir(self):
258         return os.path.join(self.home_dir, "nepi-exp")
259
260     @property
261     def exp_home(self):
262         return os.path.join(self.exp_dir, self.ec.exp_id)
263
264     @property
265     def node_home(self):
266         return os.path.join(self.exp_home, "node-%d" % self.guid)
267
268     @property
269     def run_home(self):
270         return os.path.join(self.node_home, self.ec.run_id)
271
272     @property
273     def os(self):
274         if self._os:
275             return self._os
276
277         if (not self.get("hostname") or not self.get("username")):
278             msg = "Can't resolve OS, insufficient data "
279             self.error(msg)
280             raise RuntimeError, msg
281
282         out = self.get_os()
283
284         if out.find("Fedora release 8") == 0:
285             self._os = OSType.FEDORA_8
286         elif out.find("Fedora release 12") == 0:
287             self._os = OSType.FEDORA_12
288         elif out.find("Fedora release 14") == 0:
289             self._os = OSType.FEDORA_14
290         elif out.find("Fedora release") == 0:
291             self._os = OSType.FEDORA
292         elif out.find("Debian") == 0: 
293             self._os = OSType.DEBIAN
294         elif out.find("Ubuntu") ==0:
295             self._os = OSType.UBUNTU
296         else:
297             msg = "Unsupported OS"
298             self.error(msg, out)
299             raise RuntimeError, "%s - %s " %( msg, out )
300
301         return self._os
302
303     def get_os(self):
304         # The underlying SSH layer will sometimes return an empty
305         # output (even if the command was executed without errors).
306         # To work arround this, repeat the operation N times or
307         # until the result is not empty string
308         out = ""
309         try:
310             (out, err), proc = self.execute("cat /etc/issue", 
311                     with_lock = True,
312                     blocking = True)
313         except:
314             trace = traceback.format_exc()
315             msg = "Error detecting OS: %s " % trace
316             self.error(msg, out, err)
317         
318         return out
319
320     @property
321     def use_deb(self):
322         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
323
324     @property
325     def use_rpm(self):
326         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
327                 OSType.FEDORA]
328
329     @property
330     def localhost(self):
331         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
332
333     def do_provision(self):
334         # check if host is alive
335         if not self.is_alive():
336             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
337             self.error(msg)
338             raise RuntimeError, msg
339
340         self.find_home()
341
342         if self.get("cleanProcesses"):
343             self.clean_processes()
344
345         if self.get("cleanHome"):
346             self.clean_home()
347  
348         if self.get("cleanExperiment"):
349             self.clean_experiment()
350     
351         # Create shared directory structure
352         self.mkdir(self.lib_dir)
353         self.mkdir(self.bin_dir)
354         self.mkdir(self.src_dir)
355         self.mkdir(self.share_dir)
356
357         # Create experiment node home directory
358         self.mkdir(self.node_home)
359
360         super(LinuxNode, self).do_provision()
361
362     def do_deploy(self):
363         if self.state == ResourceState.NEW:
364             self.info("Deploying node")
365             self.do_discover()
366             self.do_provision()
367
368         # Node needs to wait until all associated interfaces are 
369         # ready before it can finalize deployment
370         from nepi.resources.linux.interface import LinuxInterface
371         ifaces = self.get_connected(LinuxInterface.get_rtype())
372         for iface in ifaces:
373             if iface.state < ResourceState.READY:
374                 self.ec.schedule(reschedule_delay, self.deploy)
375                 return 
376
377         super(LinuxNode, self).do_deploy()
378
379     def do_release(self):
380         rms = self.get_connected()
381         for rm in rms:
382             # Node needs to wait until all associated RMs are released
383             # before it can be released
384             if rm.state != ResourceState.RELEASED:
385                 self.ec.schedule(reschedule_delay, self.release)
386                 return 
387
388         tear_down = self.get("tearDown")
389         if tear_down:
390             self.execute(tear_down)
391
392         self.clean_processes()
393
394         super(LinuxNode, self).do_release()
395
396     def valid_connection(self, guid):
397         # TODO: Validate!
398         return True
399
400     def clean_processes(self):
401         self.info("Cleaning up processes")
402  
403         if self.get("username") != 'root':
404             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
405                 "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
406                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
407         else:
408             if self.state >= ResourceState.READY:
409                 import pickle
410                 pids = pickle.load(open("save.proc", "rb"))
411                 pids_temp = dict()
412                 ps_aux = "ps aux |awk '{print $2,$11}'"
413                 (out, err), proc = self.execute(ps_aux)
414                 for line in out.strip().split("\n"):
415                     parts = line.strip().split(" ")
416                     pids_temp[parts[0]] = parts[1]
417                 kill_pids = set(pids_temp.items()) - set(pids.items())
418                 kill_pids = ' '.join(dict(kill_pids).keys())
419
420                 cmd = ("killall tcpdump || /bin/true ; " +
421                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
422                     "kill %s || /bin/true ; " % kill_pids)
423             else:
424                 cmd = ("killall tcpdump || /bin/true ; " +
425                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
426
427         out = err = ""
428         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
429
430     def clean_home(self):
431         """ Cleans all NEPI related folders in the Linux host
432         """
433         self.info("Cleaning up home")
434         
435         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
436                 self.home_dir )
437
438         return self.execute(cmd, with_lock = True)
439
440     def clean_experiment(self):
441         """ Cleans all experiment related files in the Linux host.
442         It preserves NEPI files and folders that have a multi experiment
443         scope.
444         """
445         self.info("Cleaning up experiment files")
446         
447         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
448                 self.exp_dir,
449                 self.ec.exp_id )
450             
451         return self.execute(cmd, with_lock = True)
452
453     def execute(self, command,
454             sudo = False,
455             stdin = None, 
456             env = None,
457             tty = False,
458             forward_x11 = False,
459             timeout = None,
460             retry = 3,
461             err_on_timeout = True,
462             connect_timeout = 30,
463             strict_host_checking = False,
464             persistent = True,
465             blocking = True,
466             with_lock = False
467             ):
468         """ Notice that this invocation will block until the
469         execution finishes. If this is not the desired behavior,
470         use 'run' instead."""
471
472         if self.localhost:
473             (out, err), proc = execfuncs.lexec(command, 
474                     user = self.get("username"), # still problem with localhost
475                     sudo = sudo,
476                     stdin = stdin,
477                     env = env)
478         else:
479             if with_lock:
480                 with self._node_lock:
481                     (out, err), proc = sshfuncs.rexec(
482                         command, 
483                         host = self.get("hostname"),
484                         user = self.get("username"),
485                         port = self.get("port"),
486                         gwuser = self.get("gatewayUser"),
487                         gw = self.get("gateway"),
488                         agent = True,
489                         sudo = sudo,
490                         stdin = stdin,
491                         identity = self.get("identity"),
492                         server_key = self.get("serverKey"),
493                         env = env,
494                         tty = tty,
495                         forward_x11 = forward_x11,
496                         timeout = timeout,
497                         retry = retry,
498                         err_on_timeout = err_on_timeout,
499                         connect_timeout = connect_timeout,
500                         persistent = persistent,
501                         blocking = blocking, 
502                         strict_host_checking = strict_host_checking
503                         )
504             else:
505                 (out, err), proc = sshfuncs.rexec(
506                     command, 
507                     host = self.get("hostname"),
508                     user = self.get("username"),
509                     port = self.get("port"),
510                     gwuser = self.get("gatewayUser"),
511                     gw = self.get("gateway"),
512                     agent = True,
513                     sudo = sudo,
514                     stdin = stdin,
515                     identity = self.get("identity"),
516                     server_key = self.get("serverKey"),
517                     env = env,
518                     tty = tty,
519                     forward_x11 = forward_x11,
520                     timeout = timeout,
521                     retry = retry,
522                     err_on_timeout = err_on_timeout,
523                     connect_timeout = connect_timeout,
524                     persistent = persistent,
525                     blocking = blocking, 
526                     strict_host_checking = strict_host_checking
527                     )
528
529         return (out, err), proc
530
531     def run(self, command, home,
532             create_home = False,
533             pidfile = 'pidfile',
534             stdin = None, 
535             stdout = 'stdout', 
536             stderr = 'stderr', 
537             sudo = False,
538             tty = False):
539         
540         self.debug("Running command '%s'" % command)
541         
542         if self.localhost:
543             (out, err), proc = execfuncs.lspawn(command, pidfile, 
544                     stdout = stdout, 
545                     stderr = stderr, 
546                     stdin = stdin, 
547                     home = home, 
548                     create_home = create_home, 
549                     sudo = sudo,
550                     user = user) 
551         else:
552             with self._node_lock:
553                 (out, err), proc = sshfuncs.rspawn(
554                     command,
555                     pidfile = pidfile,
556                     home = home,
557                     create_home = create_home,
558                     stdin = stdin if stdin is not None else '/dev/null',
559                     stdout = stdout if stdout else '/dev/null',
560                     stderr = stderr if stderr else '/dev/null',
561                     sudo = sudo,
562                     host = self.get("hostname"),
563                     user = self.get("username"),
564                     port = self.get("port"),
565                     gwuser = self.get("gatewayUser"),
566                     gw = self.get("gateway"),
567                     agent = True,
568                     identity = self.get("identity"),
569                     server_key = self.get("serverKey"),
570                     tty = tty
571                     )
572
573         return (out, err), proc
574
575     def getpid(self, home, pidfile = "pidfile"):
576         if self.localhost:
577             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
578         else:
579             with self._node_lock:
580                 pidtuple = sshfuncs.rgetpid(
581                     os.path.join(home, pidfile),
582                     host = self.get("hostname"),
583                     user = self.get("username"),
584                     port = self.get("port"),
585                     gwuser = self.get("gatewayUser"),
586                     gw = self.get("gateway"),
587                     agent = True,
588                     identity = self.get("identity"),
589                     server_key = self.get("serverKey")
590                     )
591         
592         return pidtuple
593
594     def status(self, pid, ppid):
595         if self.localhost:
596             status = execfuncs.lstatus(pid, ppid)
597         else:
598             with self._node_lock:
599                 status = sshfuncs.rstatus(
600                         pid, ppid,
601                         host = self.get("hostname"),
602                         user = self.get("username"),
603                         port = self.get("port"),
604                         gwuser = self.get("gatewayUser"),
605                         gw = self.get("gateway"),
606                         agent = True,
607                         identity = self.get("identity"),
608                         server_key = self.get("serverKey")
609                         )
610            
611         return status
612     
613     def kill(self, pid, ppid, sudo = False):
614         out = err = ""
615         proc = None
616         status = self.status(pid, ppid)
617
618         if status == sshfuncs.ProcStatus.RUNNING:
619             if self.localhost:
620                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
621             else:
622                 with self._node_lock:
623                     (out, err), proc = sshfuncs.rkill(
624                         pid, ppid,
625                         host = self.get("hostname"),
626                         user = self.get("username"),
627                         port = self.get("port"),
628                         gwuser = self.get("gatewayUser"),
629                         gw = self.get("gateway"),
630                         agent = True,
631                         sudo = sudo,
632                         identity = self.get("identity"),
633                         server_key = self.get("serverKey")
634                         )
635
636         return (out, err), proc
637
638     def copy(self, src, dst):
639         if self.localhost:
640             (out, err), proc = execfuncs.lcopy(source, dest, 
641                     recursive = True,
642                     strict_host_checking = False)
643         else:
644             with self._node_lock:
645                 (out, err), proc = sshfuncs.rcopy(
646                     src, dst, 
647                     port = self.get("port"),
648                     gwuser = self.get("gatewayUser"),
649                     gw = self.get("gateway"),
650                     identity = self.get("identity"),
651                     server_key = self.get("serverKey"),
652                     recursive = True,
653                     strict_host_checking = False)
654
655         return (out, err), proc
656
657     def upload(self, src, dst, text = False, overwrite = True):
658         """ Copy content to destination
659
660            src  content to copy. Can be a local file, directory or a list of files
661
662            dst  destination path on the remote host (remote is always self.host)
663
664            text src is text input, it must be stored into a temp file before uploading
665         """
666         # If source is a string input 
667         f = None
668         if text and not os.path.isfile(src):
669             # src is text input that should be uploaded as file
670             # create a temporal file with the content to upload
671             f = tempfile.NamedTemporaryFile(delete=False)
672             f.write(src)
673             f.close()
674             src = f.name
675
676         # If dst files should not be overwritten, check that the files do not
677         # exits already 
678         if overwrite == False:
679             src = self.filter_existing_files(src, dst)
680             if not src:
681                 return ("", ""), None 
682
683         if not self.localhost:
684             # Build destination as <user>@<server>:<path>
685             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
686
687         result = self.copy(src, dst)
688
689         # clean up temp file
690         if f:
691             os.remove(f.name)
692
693         return result
694
695     def download(self, src, dst):
696         if not self.localhost:
697             # Build destination as <user>@<server>:<path>
698             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
699         return self.copy(src, dst)
700
701     def install_packages_command(self, packages):
702         command = ""
703         if self.use_rpm:
704             command = rpmfuncs.install_packages_command(self.os, packages)
705         elif self.use_deb:
706             command = debfuncs.install_packages_command(self.os, packages)
707         else:
708             msg = "Error installing packages ( OS not known ) "
709             self.error(msg, self.os)
710             raise RuntimeError, msg
711
712         return command
713
714     def install_packages(self, packages, home, run_home = None):
715         """ Install packages in the Linux host.
716
717         'home' is the directory to upload the package installation script.
718         'run_home' is the directory from where to execute the script.
719         """
720         command = self.install_packages_command(packages)
721
722         run_home = run_home or home
723
724         (out, err), proc = self.run_and_wait(command, run_home, 
725             shfile = os.path.join(home, "instpkg.sh"),
726             pidfile = "instpkg_pidfile",
727             ecodefile = "instpkg_exitcode",
728             stdout = "instpkg_stdout", 
729             stderr = "instpkg_stderr",
730             overwrite = False,
731             raise_on_error = True)
732
733         return (out, err), proc 
734
735     def remove_packages(self, packages, home, run_home = None):
736         """ Uninstall packages from the Linux host.
737
738         'home' is the directory to upload the package un-installation script.
739         'run_home' is the directory from where to execute the script.
740         """
741         if self.use_rpm:
742             command = rpmfuncs.remove_packages_command(self.os, packages)
743         elif self.use_deb:
744             command = debfuncs.remove_packages_command(self.os, packages)
745         else:
746             msg = "Error removing packages ( OS not known ) "
747             self.error(msg)
748             raise RuntimeError, msg
749
750         run_home = run_home or home
751
752         (out, err), proc = self.run_and_wait(command, run_home, 
753             shfile = os.path.join(home, "rmpkg.sh"),
754             pidfile = "rmpkg_pidfile",
755             ecodefile = "rmpkg_exitcode",
756             stdout = "rmpkg_stdout", 
757             stderr = "rmpkg_stderr",
758             overwrite = False,
759             raise_on_error = True)
760          
761         return (out, err), proc 
762
763     def mkdir(self, path, clean = False):
764         if clean:
765             self.rmdir(path)
766
767         return self.execute("mkdir -p %s" % path, with_lock = True)
768
769     def rmdir(self, path):
770         return self.execute("rm -rf %s" % path, with_lock = True)
771         
772     def run_and_wait(self, command, home, 
773             shfile = "cmd.sh",
774             env = None,
775             overwrite = True,
776             pidfile = "pidfile", 
777             ecodefile = "exitcode", 
778             stdin = None, 
779             stdout = "stdout", 
780             stderr = "stderr", 
781             sudo = False,
782             tty = False,
783             raise_on_error = False):
784         """
785         Uploads the 'command' to a bash script in the host.
786         Then runs the script detached in background in the host, and
787         busy-waites until the script finishes executing.
788         """
789
790         if not shfile.startswith("/"):
791             shfile = os.path.join(home, shfile)
792
793         self.upload_command(command, 
794             shfile = shfile, 
795             ecodefile = ecodefile, 
796             env = env,
797             overwrite = overwrite)
798
799         command = "bash %s" % shfile
800         # run command in background in remote host
801         (out, err), proc = self.run(command, home, 
802                 pidfile = pidfile,
803                 stdin = stdin, 
804                 stdout = stdout, 
805                 stderr = stderr, 
806                 sudo = sudo,
807                 tty = tty)
808
809         # check no errors occurred
810         if proc.poll():
811             msg = " Failed to run command '%s' " % command
812             self.error(msg, out, err)
813             if raise_on_error:
814                 raise RuntimeError, msg
815
816         # Wait for pid file to be generated
817         pid, ppid = self.wait_pid(
818                 home = home, 
819                 pidfile = pidfile, 
820                 raise_on_error = raise_on_error)
821
822         # wait until command finishes to execute
823         self.wait_run(pid, ppid)
824       
825         (eout, err), proc = self.check_errors(home,
826             ecodefile = ecodefile,
827             stderr = stderr)
828
829         # Out is what was written in the stderr file
830         if err:
831             msg = " Failed to run command '%s' " % command
832             self.error(msg, eout, err)
833
834             if raise_on_error:
835                 raise RuntimeError, msg
836
837         (out, oerr), proc = self.check_output(home, stdout)
838         
839         return (out, err), proc
840
841     def exitcode(self, home, ecodefile = "exitcode"):
842         """
843         Get the exit code of an application.
844         Returns an integer value with the exit code 
845         """
846         (out, err), proc = self.check_output(home, ecodefile)
847
848         # Succeeded to open file, return exit code in the file
849         if proc.wait() == 0:
850             try:
851                 return int(out.strip())
852             except:
853                 # Error in the content of the file!
854                 return ExitCode.CORRUPTFILE
855
856         # No such file or directory
857         if proc.returncode == 1:
858             return ExitCode.FILENOTFOUND
859         
860         # Other error from 'cat'
861         return ExitCode.ERROR
862
863     def upload_command(self, command, 
864             shfile = "cmd.sh",
865             ecodefile = "exitcode",
866             overwrite = True,
867             env = None):
868         """ Saves the command as a bash script file in the remote host, and
869         forces to save the exit code of the command execution to the ecodefile
870         """
871
872         if not (command.strip().endswith(";") or command.strip().endswith("&")):
873             command += ";"
874       
875         # The exit code of the command will be stored in ecodefile
876         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
877                 'command': command,
878                 'ecodefile': ecodefile,
879                 } 
880
881         # Export environment
882         environ = self.format_environment(env)
883
884         # Add environ to command
885         command = environ + command
886
887         return self.upload(command, shfile, text = True, overwrite = overwrite)
888
889     def format_environment(self, env, inline = False):
890         """ Formats the environment variables for a command to be executed
891         either as an inline command
892         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
893         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
894         """
895         if not env: return ""
896
897         # Remove extra white spaces
898         env = re.sub(r'\s+', ' ', env.strip())
899
900         sep = ";" if inline else "\n"
901         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
902
903     def check_errors(self, home, 
904             ecodefile = "exitcode", 
905             stderr = "stderr"):
906         """ Checks whether errors occurred while running a command.
907         It first checks the exit code for the command, and only if the
908         exit code is an error one it returns the error output.
909
910         """
911         proc = None
912         err = ""
913
914         # get exit code saved in the 'exitcode' file
915         ecode = self.exitcode(home, ecodefile)
916
917         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
918             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
919         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
920             # The process returned an error code or didn't exist. 
921             # Check standard error.
922             (err, eerr), proc = self.check_output(home, stderr)
923
924             # If the stderr file was not found, assume nothing bad happened,
925             # and just ignore the error.
926             # (cat returns 1 for error "No such file or directory")
927             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
928                 err = "" 
929             
930         return ("", err), proc
931  
932     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
933         """ Waits until the pid file for the command is generated, 
934             and returns the pid and ppid of the process """
935         pid = ppid = None
936         delay = 1.0
937
938         for i in xrange(2):
939             pidtuple = self.getpid(home = home, pidfile = pidfile)
940             
941             if pidtuple:
942                 pid, ppid = pidtuple
943                 break
944             else:
945                 time.sleep(delay)
946                 delay = delay * 1.5
947         else:
948             msg = " Failed to get pid for pidfile %s/%s " % (
949                     home, pidfile )
950             self.error(msg)
951             
952             if raise_on_error:
953                 raise RuntimeError, msg
954
955         return pid, ppid
956
957     def wait_run(self, pid, ppid, trial = 0):
958         """ wait for a remote process to finish execution """
959         delay = 1.0
960
961         while True:
962             status = self.status(pid, ppid)
963             
964             if status is ProcStatus.FINISHED:
965                 break
966             elif status is not ProcStatus.RUNNING:
967                 delay = delay * 1.5
968                 time.sleep(delay)
969                 # If it takes more than 20 seconds to start, then
970                 # asume something went wrong
971                 if delay > 20:
972                     break
973             else:
974                 # The app is running, just wait...
975                 time.sleep(0.5)
976
977     def check_output(self, home, filename):
978         """ Retrives content of file """
979         (out, err), proc = self.execute("cat %s" % 
980             os.path.join(home, filename), retry = 1, with_lock = True)
981         return (out, err), proc
982
983     def is_alive(self):
984         """ Checks if host is responsive
985         """
986         if self.localhost:
987             return True
988
989         out = err = ""
990         msg = "Unresponsive host. Wrong answer. "
991
992         # The underlying SSH layer will sometimes return an empty
993         # output (even if the command was executed without errors).
994         # To work arround this, repeat the operation N times or
995         # until the result is not empty string
996         try:
997             (out, err), proc = self.execute("echo 'ALIVE'",
998                     blocking = True,
999                     with_lock = True)
1000     
1001             if out.find("ALIVE") > -1:
1002                 return True
1003         except:
1004             trace = traceback.format_exc()
1005             msg = "Unresponsive host. Error reaching host: %s " % trace
1006
1007         self.error(msg, out, err)
1008         return False
1009
1010     def find_home(self):
1011         """ Retrieves host home directory
1012         """
1013         # The underlying SSH layer will sometimes return an empty
1014         # output (even if the command was executed without errors).
1015         # To work arround this, repeat the operation N times or
1016         # until the result is not empty string
1017         msg = "Impossible to retrieve HOME directory"
1018         try:
1019             (out, err), proc = self.execute("echo ${HOME}",
1020                     blocking = True,
1021                     with_lock = True)
1022     
1023             if out.strip() != "":
1024                 self._home_dir =  out.strip()
1025         except:
1026             trace = traceback.format_exc()
1027             msg = "Impossible to retrieve HOME directory %s" % trace
1028
1029         if not self._home_dir:
1030             self.error(msg)
1031             raise RuntimeError, msg
1032
1033     def filter_existing_files(self, src, dst):
1034         """ Removes files that already exist in the Linux host from src list
1035         """
1036         # construct a dictionary with { dst: src }
1037         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
1038             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1039
1040         command = []
1041         for d in dests.keys():
1042             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1043
1044         command = ";".join(command)
1045
1046         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1047     
1048         for d in dests.keys():
1049             if out.find(d) > -1:
1050                 del dests[d]
1051
1052         if not dests:
1053             return ""
1054
1055         return " ".join(dests.values())
1056