Fixing RM.DEPLOY being executed after/during RM.RELEASE by adding a release_lock...
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.ExecReadOnly)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.ExecReadOnly)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.ExecReadOnly)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.ExecReadOnly)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.ExecReadOnly)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.ExecReadOnly)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.ExecReadOnly)
190
191         cls._register_attribute(hostname)
192         cls._register_attribute(username)
193         cls._register_attribute(port)
194         cls._register_attribute(home)
195         cls._register_attribute(identity)
196         cls._register_attribute(server_key)
197         cls._register_attribute(clean_home)
198         cls._register_attribute(clean_experiment)
199         cls._register_attribute(clean_processes)
200         cls._register_attribute(tear_down)
201
202     def __init__(self, ec, guid):
203         super(LinuxNode, self).__init__(ec, guid)
204         self._os = None
205         # home directory at Linux host
206         self._home_dir = ""
207         
208         # lock to prevent concurrent applications on the same node,
209         # to execute commands at the same time. There are potential
210         # concurrency issues when using SSH to a same host from 
211         # multiple threads. There are also possible operational 
212         # issues, e.g. an application querying the existence 
213         # of a file or folder prior to its creation, and another 
214         # application creating the same file or folder in between.
215         self._node_lock = threading.Lock()
216     
217     def log_message(self, msg):
218         return " guid %d - host %s - %s " % (self.guid, 
219                 self.get("hostname"), msg)
220
221     @property
222     def home_dir(self):
223         home = self.get("home") or ""
224         if not home.startswith("/"):
225            home = os.path.join(self._home_dir, home) 
226         return home
227
228     @property
229     def usr_dir(self):
230         return os.path.join(self.home_dir, "nepi-usr")
231
232     @property
233     def lib_dir(self):
234         return os.path.join(self.usr_dir, "lib")
235
236     @property
237     def bin_dir(self):
238         return os.path.join(self.usr_dir, "bin")
239
240     @property
241     def src_dir(self):
242         return os.path.join(self.usr_dir, "src")
243
244     @property
245     def share_dir(self):
246         return os.path.join(self.usr_dir, "share")
247
248     @property
249     def exp_dir(self):
250         return os.path.join(self.home_dir, "nepi-exp")
251
252     @property
253     def exp_home(self):
254         return os.path.join(self.exp_dir, self.ec.exp_id)
255
256     @property
257     def node_home(self):
258         return os.path.join(self.exp_home, "node-%d" % self.guid)
259
260     @property
261     def run_home(self):
262         return os.path.join(self.node_home, self.ec.run_id)
263
264     @property
265     def os(self):
266         if self._os:
267             return self._os
268
269         if (not self.get("hostname") or not self.get("username")):
270             msg = "Can't resolve OS, insufficient data "
271             self.error(msg)
272             raise RuntimeError, msg
273
274         out = self.get_os()
275
276         if out.find("Fedora release 8") == 0:
277             self._os = OSType.FEDORA_8
278         elif out.find("Fedora release 12") == 0:
279             self._os = OSType.FEDORA_12
280         elif out.find("Fedora release 14") == 0:
281             self._os = OSType.FEDORA_14
282         elif out.find("Fedora release") == 0:
283             self._os = OSType.FEDORA
284         elif out.find("Debian") == 0: 
285             self._os = OSType.DEBIAN
286         elif out.find("Ubuntu") ==0:
287             self._os = OSType.UBUNTU
288         else:
289             msg = "Unsupported OS"
290             self.error(msg, out)
291             raise RuntimeError, "%s - %s " %( msg, out )
292
293         return self._os
294
295     def get_os(self):
296         # The underlying SSH layer will sometimes return an empty
297         # output (even if the command was executed without errors).
298         # To work arround this, repeat the operation N times or
299         # until the result is not empty string
300         out = ""
301         retrydelay = 1.0
302         for i in xrange(2):
303             try:
304                 (out, err), proc = self.execute("cat /etc/issue", 
305                         retry = 5,
306                         with_lock = True,
307                         blocking = True)
308
309                 if out.strip() != "":
310                     return out
311             except:
312                 trace = traceback.format_exc()
313                 msg = "Error detecting OS: %s " % trace
314                 self.error(msg, out, err)
315                 return False
316
317             time.sleep(min(30.0, retrydelay))
318             retrydelay *= 1.5
319
320     @property
321     def use_deb(self):
322         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
323
324     @property
325     def use_rpm(self):
326         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
327                 OSType.FEDORA]
328
329     @property
330     def localhost(self):
331         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
332
333     def do_provision(self):
334         # check if host is alive
335         if not self.is_alive():
336             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
337             self.error(msg)
338             raise RuntimeError, msg
339
340         self.find_home()
341
342         if self.get("cleanProcesses"):
343             self.clean_processes()
344
345         if self.get("cleanHome"):
346             self.clean_home()
347  
348         if self.get("cleanExperiment"):
349             self.clean_experiment()
350     
351         # Create shared directory structure
352         self.mkdir(self.lib_dir)
353         self.mkdir(self.bin_dir)
354         self.mkdir(self.src_dir)
355         self.mkdir(self.share_dir)
356
357         # Create experiment node home directory
358         self.mkdir(self.node_home)
359
360         super(LinuxNode, self).do_provision()
361
362     def do_deploy(self):
363         if self.state == ResourceState.NEW:
364             self.info("Deploying node")
365             self.do_discover()
366             self.do_provision()
367
368         # Node needs to wait until all associated interfaces are 
369         # ready before it can finalize deployment
370         from nepi.resources.linux.interface import LinuxInterface
371         ifaces = self.get_connected(LinuxInterface.rtype())
372         for iface in ifaces:
373             if iface.state < ResourceState.READY:
374                 self.ec.schedule(reschedule_delay, self.deploy)
375                 return 
376
377         super(LinuxNode, self).do_deploy()
378
379     def do_release(self):
380         rms = self.get_connected()
381         for rm in rms:
382             # Node needs to wait until all associated RMs are released
383             # before it can be released
384             if rm.state != ResourceState.RELEASED:
385                 self.ec.schedule(reschedule_delay, self.release)
386                 return 
387
388         tear_down = self.get("tearDown")
389         if tear_down:
390             self.execute(tear_down)
391
392         self.clean_processes()
393
394         super(LinuxNode, self).do_release()
395
396     def valid_connection(self, guid):
397         # TODO: Validate!
398         return True
399
400     def clean_processes(self, killer = False):
401         self.info("Cleaning up processes")
402         
403         if killer:
404             # Hardcore kill
405             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
406                 "sudo -S killall python tcpdump || /bin/true ; " +
407                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
408                 "sudo -S killall -u root || /bin/true ; " +
409                 "sudo -S killall -u root || /bin/true ; ")
410         else:
411             # Be gentler...
412             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
413                 "sudo -S killall tcpdump || /bin/true ; " +
414                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
415                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
416
417         out = err = ""
418         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
419
420     def clean_home(self):
421         """ Cleans all NEPI related folders in the Linux host
422         """
423         self.info("Cleaning up home")
424         
425         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
426                 self.home_dir )
427
428         return self.execute(cmd, with_lock = True)
429
430     def clean_experiment(self):
431         """ Cleans all experiment related files in the Linux host.
432         It preserves NEPI files and folders that have a multi experiment
433         scope.
434         """
435         self.info("Cleaning up experiment files")
436         
437         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
438                 self.exp_dir,
439                 self.ec.exp_id )
440             
441         return self.execute(cmd, with_lock = True)
442
443     def execute(self, command,
444             sudo = False,
445             stdin = None, 
446             env = None,
447             tty = False,
448             forward_x11 = False,
449             timeout = None,
450             retry = 3,
451             err_on_timeout = True,
452             connect_timeout = 30,
453             strict_host_checking = False,
454             persistent = True,
455             blocking = True,
456             with_lock = False
457             ):
458         """ Notice that this invocation will block until the
459         execution finishes. If this is not the desired behavior,
460         use 'run' instead."""
461
462         if self.localhost:
463             (out, err), proc = execfuncs.lexec(command, 
464                     user = user,
465                     sudo = sudo,
466                     stdin = stdin,
467                     env = env)
468         else:
469             if with_lock:
470                 with self._node_lock:
471                     (out, err), proc = sshfuncs.rexec(
472                         command, 
473                         host = self.get("hostname"),
474                         user = self.get("username"),
475                         port = self.get("port"),
476                         agent = True,
477                         sudo = sudo,
478                         stdin = stdin,
479                         identity = self.get("identity"),
480                         server_key = self.get("serverKey"),
481                         env = env,
482                         tty = tty,
483                         forward_x11 = forward_x11,
484                         timeout = timeout,
485                         retry = retry,
486                         err_on_timeout = err_on_timeout,
487                         connect_timeout = connect_timeout,
488                         persistent = persistent,
489                         blocking = blocking, 
490                         strict_host_checking = strict_host_checking
491                         )
492             else:
493                 (out, err), proc = sshfuncs.rexec(
494                     command, 
495                     host = self.get("hostname"),
496                     user = self.get("username"),
497                     port = self.get("port"),
498                     agent = True,
499                     sudo = sudo,
500                     stdin = stdin,
501                     identity = self.get("identity"),
502                     server_key = self.get("serverKey"),
503                     env = env,
504                     tty = tty,
505                     forward_x11 = forward_x11,
506                     timeout = timeout,
507                     retry = retry,
508                     err_on_timeout = err_on_timeout,
509                     connect_timeout = connect_timeout,
510                     persistent = persistent,
511                     blocking = blocking, 
512                     strict_host_checking = strict_host_checking
513                     )
514
515         return (out, err), proc
516
517     def run(self, command, home,
518             create_home = False,
519             pidfile = 'pidfile',
520             stdin = None, 
521             stdout = 'stdout', 
522             stderr = 'stderr', 
523             sudo = False,
524             tty = False):
525         
526         self.debug("Running command '%s'" % command)
527         
528         if self.localhost:
529             (out, err), proc = execfuncs.lspawn(command, pidfile, 
530                     stdout = stdout, 
531                     stderr = stderr, 
532                     stdin = stdin, 
533                     home = home, 
534                     create_home = create_home, 
535                     sudo = sudo,
536                     user = user) 
537         else:
538             with self._node_lock:
539                 (out, err), proc = sshfuncs.rspawn(
540                     command,
541                     pidfile = pidfile,
542                     home = home,
543                     create_home = create_home,
544                     stdin = stdin if stdin is not None else '/dev/null',
545                     stdout = stdout if stdout else '/dev/null',
546                     stderr = stderr if stderr else '/dev/null',
547                     sudo = sudo,
548                     host = self.get("hostname"),
549                     user = self.get("username"),
550                     port = self.get("port"),
551                     agent = True,
552                     identity = self.get("identity"),
553                     server_key = self.get("serverKey"),
554                     tty = tty
555                     )
556
557         return (out, err), proc
558
559     def getpid(self, home, pidfile = "pidfile"):
560         if self.localhost:
561             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
562         else:
563             with self._node_lock:
564                 pidtuple = sshfuncs.rgetpid(
565                     os.path.join(home, pidfile),
566                     host = self.get("hostname"),
567                     user = self.get("username"),
568                     port = self.get("port"),
569                     agent = True,
570                     identity = self.get("identity"),
571                     server_key = self.get("serverKey")
572                     )
573         
574         return pidtuple
575
576     def status(self, pid, ppid):
577         if self.localhost:
578             status = execfuncs.lstatus(pid, ppid)
579         else:
580             with self._node_lock:
581                 status = sshfuncs.rstatus(
582                         pid, ppid,
583                         host = self.get("hostname"),
584                         user = self.get("username"),
585                         port = self.get("port"),
586                         agent = True,
587                         identity = self.get("identity"),
588                         server_key = self.get("serverKey")
589                         )
590            
591         return status
592     
593     def kill(self, pid, ppid, sudo = False):
594         out = err = ""
595         proc = None
596         status = self.status(pid, ppid)
597
598         if status == sshfuncs.ProcStatus.RUNNING:
599             if self.localhost:
600                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
601             else:
602                 with self._node_lock:
603                     (out, err), proc = sshfuncs.rkill(
604                         pid, ppid,
605                         host = self.get("hostname"),
606                         user = self.get("username"),
607                         port = self.get("port"),
608                         agent = True,
609                         sudo = sudo,
610                         identity = self.get("identity"),
611                         server_key = self.get("serverKey")
612                         )
613
614         return (out, err), proc
615
616     def copy(self, src, dst):
617         if self.localhost:
618             (out, err), proc = execfuncs.lcopy(source, dest, 
619                     recursive = True,
620                     strict_host_checking = False)
621         else:
622             with self._node_lock:
623                 (out, err), proc = sshfuncs.rcopy(
624                     src, dst, 
625                     port = self.get("port"),
626                     identity = self.get("identity"),
627                     server_key = self.get("serverKey"),
628                     recursive = True,
629                     strict_host_checking = False)
630
631         return (out, err), proc
632
633     def upload(self, src, dst, text = False, overwrite = True):
634         """ Copy content to destination
635
636            src  content to copy. Can be a local file, directory or a list of files
637
638            dst  destination path on the remote host (remote is always self.host)
639
640            text src is text input, it must be stored into a temp file before uploading
641         """
642         # If source is a string input 
643         f = None
644         if text and not os.path.isfile(src):
645             # src is text input that should be uploaded as file
646             # create a temporal file with the content to upload
647             f = tempfile.NamedTemporaryFile(delete=False)
648             f.write(src)
649             f.close()
650             src = f.name
651
652         # If dst files should not be overwritten, check that the files do not
653         # exits already 
654         if overwrite == False:
655             src = self.filter_existing_files(src, dst)
656             if not src:
657                 return ("", ""), None 
658
659         if not self.localhost:
660             # Build destination as <user>@<server>:<path>
661             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
662
663         result = self.copy(src, dst)
664
665         # clean up temp file
666         if f:
667             os.remove(f.name)
668
669         return result
670
671     def download(self, src, dst):
672         if not self.localhost:
673             # Build destination as <user>@<server>:<path>
674             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
675         return self.copy(src, dst)
676
677     def install_packages_command(self, packages):
678         command = ""
679         if self.use_rpm:
680             command = rpmfuncs.install_packages_command(self.os, packages)
681         elif self.use_deb:
682             command = debfuncs.install_packages_command(self.os, packages)
683         else:
684             msg = "Error installing packages ( OS not known ) "
685             self.error(msg, self.os)
686             raise RuntimeError, msg
687
688         return command
689
690     def install_packages(self, packages, home, run_home = None):
691         """ Install packages in the Linux host.
692
693         'home' is the directory to upload the package installation script.
694         'run_home' is the directory from where to execute the script.
695         """
696         command = self.install_packages_command(packages)
697
698         run_home = run_home or home
699
700         (out, err), proc = self.run_and_wait(command, run_home, 
701             shfile = os.path.join(home, "instpkg.sh"),
702             pidfile = "instpkg_pidfile",
703             ecodefile = "instpkg_exitcode",
704             stdout = "instpkg_stdout", 
705             stderr = "instpkg_stderr",
706             overwrite = False,
707             raise_on_error = True)
708
709         return (out, err), proc 
710
711     def remove_packages(self, packages, home, run_home = None):
712         """ Uninstall packages from the Linux host.
713
714         'home' is the directory to upload the package un-installation script.
715         'run_home' is the directory from where to execute the script.
716         """
717         if self.use_rpm:
718             command = rpmfuncs.remove_packages_command(self.os, packages)
719         elif self.use_deb:
720             command = debfuncs.remove_packages_command(self.os, packages)
721         else:
722             msg = "Error removing packages ( OS not known ) "
723             self.error(msg)
724             raise RuntimeError, msg
725
726         run_home = run_home or home
727
728         (out, err), proc = self.run_and_wait(command, run_home, 
729             shfile = os.path.join(home, "rmpkg.sh"),
730             pidfile = "rmpkg_pidfile",
731             ecodefile = "rmpkg_exitcode",
732             stdout = "rmpkg_stdout", 
733             stderr = "rmpkg_stderr",
734             overwrite = False,
735             raise_on_error = True)
736          
737         return (out, err), proc 
738
739     def mkdir(self, path, clean = False):
740         if clean:
741             self.rmdir(path)
742
743         return self.execute("mkdir -p %s" % path, with_lock = True)
744
745     def rmdir(self, path):
746         return self.execute("rm -rf %s" % path, with_lock = True)
747         
748     def run_and_wait(self, command, home, 
749             shfile = "cmd.sh",
750             env = None,
751             overwrite = True,
752             pidfile = "pidfile", 
753             ecodefile = "exitcode", 
754             stdin = None, 
755             stdout = "stdout", 
756             stderr = "stderr", 
757             sudo = False,
758             tty = False,
759             raise_on_error = False):
760         """
761         Uploads the 'command' to a bash script in the host.
762         Then runs the script detached in background in the host, and
763         busy-waites until the script finishes executing.
764         """
765
766         if not shfile.startswith("/"):
767             shfile = os.path.join(home, shfile)
768
769         self.upload_command(command, 
770             shfile = shfile, 
771             ecodefile = ecodefile, 
772             env = env,
773             overwrite = overwrite)
774
775         command = "bash %s" % shfile
776         # run command in background in remote host
777         (out, err), proc = self.run(command, home, 
778                 pidfile = pidfile,
779                 stdin = stdin, 
780                 stdout = stdout, 
781                 stderr = stderr, 
782                 sudo = sudo,
783                 tty = tty)
784
785         # check no errors occurred
786         if proc.poll():
787             msg = " Failed to run command '%s' " % command
788             self.error(msg, out, err)
789             if raise_on_error:
790                 raise RuntimeError, msg
791
792         # Wait for pid file to be generated
793         pid, ppid = self.wait_pid(
794                 home = home, 
795                 pidfile = pidfile, 
796                 raise_on_error = raise_on_error)
797
798         # wait until command finishes to execute
799         self.wait_run(pid, ppid)
800       
801         (eout, err), proc = self.check_errors(home,
802             ecodefile = ecodefile,
803             stderr = stderr)
804
805         # Out is what was written in the stderr file
806         if err:
807             msg = " Failed to run command '%s' " % command
808             self.error(msg, eout, err)
809
810             if raise_on_error:
811                 raise RuntimeError, msg
812
813         (out, oerr), proc = self.check_output(home, stdout)
814         
815         return (out, err), proc
816
817     def exitcode(self, home, ecodefile = "exitcode"):
818         """
819         Get the exit code of an application.
820         Returns an integer value with the exit code 
821         """
822         (out, err), proc = self.check_output(home, ecodefile)
823
824         # Succeeded to open file, return exit code in the file
825         if proc.wait() == 0:
826             try:
827                 return int(out.strip())
828             except:
829                 # Error in the content of the file!
830                 return ExitCode.CORRUPTFILE
831
832         # No such file or directory
833         if proc.returncode == 1:
834             return ExitCode.FILENOTFOUND
835         
836         # Other error from 'cat'
837         return ExitCode.ERROR
838
839     def upload_command(self, command, 
840             shfile = "cmd.sh",
841             ecodefile = "exitcode",
842             overwrite = True,
843             env = None):
844         """ Saves the command as a bash script file in the remote host, and
845         forces to save the exit code of the command execution to the ecodefile
846         """
847
848         if not (command.strip().endswith(";") or command.strip().endswith("&")):
849             command += ";"
850       
851         # The exit code of the command will be stored in ecodefile
852         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
853                 'command': command,
854                 'ecodefile': ecodefile,
855                 } 
856
857         # Export environment
858         environ = self.format_environment(env)
859
860         # Add environ to command
861         command = environ + command
862
863         return self.upload(command, shfile, text = True, overwrite = overwrite)
864
865     def format_environment(self, env, inline = False):
866         """ Formats the environment variables for a command to be executed
867         either as an inline command
868         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
869         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
870         """
871         if not env: return ""
872
873         # Remove extra white spaces
874         env = re.sub(r'\s+', ' ', env.strip())
875
876         sep = ";" if inline else "\n"
877         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
878
879     def check_errors(self, home, 
880             ecodefile = "exitcode", 
881             stderr = "stderr"):
882         """ Checks whether errors occurred while running a command.
883         It first checks the exit code for the command, and only if the
884         exit code is an error one it returns the error output.
885
886         """
887         proc = None
888         err = ""
889
890         # get exit code saved in the 'exitcode' file
891         ecode = self.exitcode(home, ecodefile)
892
893         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
894             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
895         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
896             # The process returned an error code or didn't exist. 
897             # Check standard error.
898             (err, eerr), proc = self.check_output(home, stderr)
899
900             # If the stderr file was not found, assume nothing bad happened,
901             # and just ignore the error.
902             # (cat returns 1 for error "No such file or directory")
903             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
904                 err = "" 
905             
906         return ("", err), proc
907  
908     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
909         """ Waits until the pid file for the command is generated, 
910             and returns the pid and ppid of the process """
911         pid = ppid = None
912         delay = 1.0
913
914         for i in xrange(2):
915             pidtuple = self.getpid(home = home, pidfile = pidfile)
916             
917             if pidtuple:
918                 pid, ppid = pidtuple
919                 break
920             else:
921                 time.sleep(delay)
922                 delay = delay * 1.5
923         else:
924             msg = " Failed to get pid for pidfile %s/%s " % (
925                     home, pidfile )
926             self.error(msg)
927             
928             if raise_on_error:
929                 raise RuntimeError, msg
930
931         return pid, ppid
932
933     def wait_run(self, pid, ppid, trial = 0):
934         """ wait for a remote process to finish execution """
935         delay = 1.0
936
937         while True:
938             status = self.status(pid, ppid)
939             
940             if status is ProcStatus.FINISHED:
941                 break
942             elif status is not ProcStatus.RUNNING:
943                 delay = delay * 1.5
944                 time.sleep(delay)
945                 # If it takes more than 20 seconds to start, then
946                 # asume something went wrong
947                 if delay > 20:
948                     break
949             else:
950                 # The app is running, just wait...
951                 time.sleep(0.5)
952
953     def check_output(self, home, filename):
954         """ Retrives content of file """
955         (out, err), proc = self.execute("cat %s" % 
956             os.path.join(home, filename), retry = 1, with_lock = True)
957         return (out, err), proc
958
959     def is_alive(self):
960         """ Checks if host is responsive
961         """
962         if self.localhost:
963             return True
964
965         out = err = ""
966         # The underlying SSH layer will sometimes return an empty
967         # output (even if the command was executed without errors).
968         # To work arround this, repeat the operation N times or
969         # until the result is not empty string
970         retrydelay = 1.0
971         for i in xrange(2):
972             try:
973                 (out, err), proc = self.execute("echo 'ALIVE'",
974                         retry = 5,
975                         blocking = True,
976                         with_lock = True)
977         
978                 if out.find("ALIVE") > -1:
979                     return True
980             except:
981                 trace = traceback.format_exc()
982                 msg = "Unresponsive host. Error reaching host: %s " % trace
983                 self.error(msg, out, err)
984                 return False
985
986             time.sleep(min(30.0, retrydelay))
987             retrydelay *= 1.5
988
989         if out.find("ALIVE") > -1:
990             return True
991         else:
992             msg = "Unresponsive host. Wrong answer. "
993             self.error(msg, out, err)
994             return False
995
996     def find_home(self):
997         """ Retrieves host home directory
998         """
999         # The underlying SSH layer will sometimes return an empty
1000         # output (even if the command was executed without errors).
1001         # To work arround this, repeat the operation N times or
1002         # until the result is not empty string
1003         retrydelay = 1.0
1004         for i in xrange(2):
1005             try:
1006                 (out, err), proc = self.execute("echo ${HOME}",
1007                         retry = 5,
1008                         blocking = True,
1009                         with_lock = True)
1010         
1011                 if out.strip() != "":
1012                     self._home_dir =  out.strip()
1013                     break
1014             except:
1015                 trace = traceback.format_exc()
1016                 msg = "Impossible to retrieve HOME directory" % trace
1017                 self.error(msg, out, err)
1018                 return False
1019
1020             time.sleep(min(30.0, retrydelay))
1021             retrydelay *= 1.5
1022
1023         if not self._home_dir:
1024             msg = "Impossible to retrieve HOME directory"
1025             self.error(msg, out, err)
1026             raise RuntimeError, msg
1027
1028     def filter_existing_files(self, src, dst):
1029         """ Removes files that already exist in the Linux host from src list
1030         """
1031         # construct a dictionary with { dst: src }
1032         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
1033             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1034
1035         command = []
1036         for d in dests.keys():
1037             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1038
1039         command = ";".join(command)
1040
1041         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1042     
1043         for d in dests.keys():
1044             if out.find(d) > -1:
1045                 del dests[d]
1046
1047         if not dests:
1048             return ""
1049
1050         return " ".join(dests.values())
1051