Code cleanup. Setting resource state through specific functions
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
22         reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145
146     @classmethod
147     def _register_attributes(cls):
148         hostname = Attribute("hostname", "Hostname of the machine",
149                 flags = Flags.ExecReadOnly)
150
151         username = Attribute("username", "Local account username", 
152                 flags = Flags.Credential)
153
154         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
155         
156         home = Attribute("home",
157                 "Experiment home directory to store all experiment related files",
158                 flags = Flags.ExecReadOnly)
159         
160         identity = Attribute("identity", "SSH identity file",
161                 flags = Flags.Credential)
162         
163         server_key = Attribute("serverKey", "Server public key", 
164                 flags = Flags.ExecReadOnly)
165         
166         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
167                 " from node home folder before starting experiment", 
168                 flags = Flags.ExecReadOnly)
169
170         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
171                 " from a previous same experiment, before the new experiment starts", 
172                 flags = Flags.ExecReadOnly)
173         
174         clean_processes = Attribute("cleanProcesses", 
175                 "Kill all running processes before starting experiment",
176                 flags = Flags.ExecReadOnly)
177         
178         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
179                 "releasing the resource",
180                 flags = Flags.ExecReadOnly)
181
182         cls._register_attribute(hostname)
183         cls._register_attribute(username)
184         cls._register_attribute(port)
185         cls._register_attribute(home)
186         cls._register_attribute(identity)
187         cls._register_attribute(server_key)
188         cls._register_attribute(clean_home)
189         cls._register_attribute(clean_experiment)
190         cls._register_attribute(clean_processes)
191         cls._register_attribute(tear_down)
192
193     def __init__(self, ec, guid):
194         super(LinuxNode, self).__init__(ec, guid)
195         self._os = None
196         # home directory at Linux host
197         self._home_dir = ""
198         
199         # lock to prevent concurrent applications on the same node,
200         # to execute commands at the same time. There are potential
201         # concurrency issues when using SSH to a same host from 
202         # multiple threads. There are also possible operational 
203         # issues, e.g. an application querying the existence 
204         # of a file or folder prior to its creation, and another 
205         # application creating the same file or folder in between.
206         self._node_lock = threading.Lock()
207     
208     def log_message(self, msg):
209         return " guid %d - host %s - %s " % (self.guid, 
210                 self.get("hostname"), msg)
211
212     @property
213     def home_dir(self):
214         home = self.get("home") or ""
215         if not home.startswith("/"):
216            home = os.path.join(self._home_dir, home) 
217         return home
218
219     @property
220     def usr_dir(self):
221         return os.path.join(self.home_dir, "nepi-usr")
222
223     @property
224     def lib_dir(self):
225         return os.path.join(self.usr_dir, "lib")
226
227     @property
228     def bin_dir(self):
229         return os.path.join(self.usr_dir, "bin")
230
231     @property
232     def src_dir(self):
233         return os.path.join(self.usr_dir, "src")
234
235     @property
236     def share_dir(self):
237         return os.path.join(self.usr_dir, "share")
238
239     @property
240     def exp_dir(self):
241         return os.path.join(self.home_dir, "nepi-exp")
242
243     @property
244     def exp_home(self):
245         return os.path.join(self.exp_dir, self.ec.exp_id)
246
247     @property
248     def node_home(self):
249         return os.path.join(self.exp_home, "node-%d" % self.guid)
250
251     @property
252     def run_home(self):
253         return os.path.join(self.node_home, self.ec.run_id)
254
255     @property
256     def os(self):
257         if self._os:
258             return self._os
259
260         if (not self.get("hostname") or not self.get("username")):
261             msg = "Can't resolve OS, insufficient data "
262             self.error(msg)
263             raise RuntimeError, msg
264
265         out = self.get_os()
266
267         if out.find("Fedora release 8") == 0:
268             self._os = OSType.FEDORA_8
269         elif out.find("Fedora release 12") == 0:
270             self._os = OSType.FEDORA_12
271         elif out.find("Fedora release 14") == 0:
272             self._os = OSType.FEDORA_14
273         elif out.find("Debian") == 0: 
274             self._os = OSType.DEBIAN
275         elif out.find("Ubuntu") ==0:
276             self._os = OSType.UBUNTU
277         else:
278             msg = "Unsupported OS"
279             self.error(msg, out)
280             raise RuntimeError, "%s - %s " %( msg, out )
281
282         return self._os
283
284     def get_os(self):
285         # The underlying SSH layer will sometimes return an empty
286         # output (even if the command was executed without errors).
287         # To work arround this, repeat the operation N times or
288         # until the result is not empty string
289         out = ""
290         retrydelay = 1.0
291         for i in xrange(10):
292             try:
293                 (out, err), proc = self.execute("cat /etc/issue", 
294                         retry = 5,
295                         with_lock = True,
296                         blocking = True)
297
298                 if out.strip() != "":
299                     return out
300             except:
301                 trace = traceback.format_exc()
302                 msg = "Error detecting OS: %s " % trace
303                 self.error(msg, out, err)
304                 return False
305
306             time.sleep(min(30.0, retrydelay))
307             retrydelay *= 1.5
308
309
310     @property
311     def use_deb(self):
312         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
313
314     @property
315     def use_rpm(self):
316         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
317                 OSType.FEDORA]
318
319     @property
320     def localhost(self):
321         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
322
323     def provision(self):
324         # check if host is alive
325         if not self.is_alive():
326             self.fail()
327             
328             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
329             self.error(msg)
330             raise RuntimeError, msg
331
332         self.find_home()
333
334         if self.get("cleanProcesses"):
335             self.clean_processes()
336
337         if self.get("cleanHome"):
338             self.clean_home()
339  
340         if self.get("cleanExperiment"):
341             self.clean_experiment()
342     
343         # Create shared directory structure
344         self.mkdir(self.lib_dir)
345         self.mkdir(self.bin_dir)
346         self.mkdir(self.src_dir)
347         self.mkdir(self.share_dir)
348
349         # Create experiment node home directory
350         self.mkdir(self.node_home)
351
352         super(LinuxNode, self).provision()
353
354     def deploy(self):
355         if self.state == ResourceState.NEW:
356             try:
357                 self.discover()
358                 self.provision()
359             except:
360                 self.fail()
361                 raise
362
363         # Node needs to wait until all associated interfaces are 
364         # ready before it can finalize deployment
365         from nepi.resources.linux.interface import LinuxInterface
366         ifaces = self.get_connected(LinuxInterface.rtype())
367         for iface in ifaces:
368             if iface.state < ResourceState.READY:
369                 self.ec.schedule(reschedule_delay, self.deploy)
370                 return 
371
372         super(LinuxNode, self).deploy()
373
374     def release(self):
375         # Node needs to wait until all associated RMs are released
376         # to be released
377         rms = self.get_connected()
378         for rm in rms:
379             if rm.state < ResourceState.STOPPED:
380                 self.ec.schedule(reschedule_delay, self.release)
381                 return 
382
383         tear_down = self.get("tearDown")
384         if tear_down:
385             self.execute(tear_down)
386
387         self.clean_processes()
388
389         super(LinuxNode, self).release()
390
391     def valid_connection(self, guid):
392         # TODO: Validate!
393         return True
394
395     def clean_processes(self, killer = False):
396         self.info("Cleaning up processes")
397         
398         if killer:
399             # Hardcore kill
400             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
401                 "sudo -S killall python tcpdump || /bin/true ; " +
402                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
403                 "sudo -S killall -u root || /bin/true ; " +
404                 "sudo -S killall -u root || /bin/true ; ")
405         else:
406             # Be gentler...
407             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
408                 "sudo -S killall tcpdump || /bin/true ; " +
409                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
410                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
411
412         out = err = ""
413         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
414             
415     def clean_home(self):
416         """ Cleans all NEPI related folders in the Linux host
417         """
418         self.info("Cleaning up home")
419         
420         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
421                 self.home_dir )
422
423         return self.execute(cmd, with_lock = True)
424
425     def clean_experiment(self):
426         """ Cleans all experiment related files in the Linux host.
427         It preserves NEPI files and folders that have a multi experiment
428         scope.
429         """
430         self.info("Cleaning up experiment files")
431         
432         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
433                 self.exp_dir,
434                 self.ec.exp_id )
435             
436         return self.execute(cmd, with_lock = True)
437
438     def execute(self, command,
439             sudo = False,
440             stdin = None, 
441             env = None,
442             tty = False,
443             forward_x11 = False,
444             timeout = None,
445             retry = 3,
446             err_on_timeout = True,
447             connect_timeout = 30,
448             strict_host_checking = False,
449             persistent = True,
450             blocking = True,
451             with_lock = False
452             ):
453         """ Notice that this invocation will block until the
454         execution finishes. If this is not the desired behavior,
455         use 'run' instead."""
456
457         if self.localhost:
458             (out, err), proc = execfuncs.lexec(command, 
459                     user = user,
460                     sudo = sudo,
461                     stdin = stdin,
462                     env = env)
463         else:
464             if with_lock:
465                 with self._node_lock:
466                     (out, err), proc = sshfuncs.rexec(
467                         command, 
468                         host = self.get("hostname"),
469                         user = self.get("username"),
470                         port = self.get("port"),
471                         agent = True,
472                         sudo = sudo,
473                         stdin = stdin,
474                         identity = self.get("identity"),
475                         server_key = self.get("serverKey"),
476                         env = env,
477                         tty = tty,
478                         forward_x11 = forward_x11,
479                         timeout = timeout,
480                         retry = retry,
481                         err_on_timeout = err_on_timeout,
482                         connect_timeout = connect_timeout,
483                         persistent = persistent,
484                         blocking = blocking, 
485                         strict_host_checking = strict_host_checking
486                         )
487             else:
488                 (out, err), proc = sshfuncs.rexec(
489                     command, 
490                     host = self.get("hostname"),
491                     user = self.get("username"),
492                     port = self.get("port"),
493                     agent = True,
494                     sudo = sudo,
495                     stdin = stdin,
496                     identity = self.get("identity"),
497                     server_key = self.get("serverKey"),
498                     env = env,
499                     tty = tty,
500                     forward_x11 = forward_x11,
501                     timeout = timeout,
502                     retry = retry,
503                     err_on_timeout = err_on_timeout,
504                     connect_timeout = connect_timeout,
505                     persistent = persistent,
506                     blocking = blocking, 
507                     strict_host_checking = strict_host_checking
508                     )
509
510         return (out, err), proc
511
512     def run(self, command, home,
513             create_home = False,
514             pidfile = 'pidfile',
515             stdin = None, 
516             stdout = 'stdout', 
517             stderr = 'stderr', 
518             sudo = False,
519             tty = False):
520         
521         self.debug("Running command '%s'" % command)
522         
523         if self.localhost:
524             (out, err), proc = execfuncs.lspawn(command, pidfile, 
525                     stdout = stdout, 
526                     stderr = stderr, 
527                     stdin = stdin, 
528                     home = home, 
529                     create_home = create_home, 
530                     sudo = sudo,
531                     user = user) 
532         else:
533             with self._node_lock:
534                 (out, err), proc = sshfuncs.rspawn(
535                     command,
536                     pidfile = pidfile,
537                     home = home,
538                     create_home = create_home,
539                     stdin = stdin if stdin is not None else '/dev/null',
540                     stdout = stdout if stdout else '/dev/null',
541                     stderr = stderr if stderr else '/dev/null',
542                     sudo = sudo,
543                     host = self.get("hostname"),
544                     user = self.get("username"),
545                     port = self.get("port"),
546                     agent = True,
547                     identity = self.get("identity"),
548                     server_key = self.get("serverKey"),
549                     tty = tty
550                     )
551
552         return (out, err), proc
553
554     def getpid(self, home, pidfile = "pidfile"):
555         if self.localhost:
556             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
557         else:
558             with self._node_lock:
559                 pidtuple = sshfuncs.rgetpid(
560                     os.path.join(home, pidfile),
561                     host = self.get("hostname"),
562                     user = self.get("username"),
563                     port = self.get("port"),
564                     agent = True,
565                     identity = self.get("identity"),
566                     server_key = self.get("serverKey")
567                     )
568         
569         return pidtuple
570
571     def status(self, pid, ppid):
572         if self.localhost:
573             status = execfuncs.lstatus(pid, ppid)
574         else:
575             with self._node_lock:
576                 status = sshfuncs.rstatus(
577                         pid, ppid,
578                         host = self.get("hostname"),
579                         user = self.get("username"),
580                         port = self.get("port"),
581                         agent = True,
582                         identity = self.get("identity"),
583                         server_key = self.get("serverKey")
584                         )
585            
586         return status
587     
588     def kill(self, pid, ppid, sudo = False):
589         out = err = ""
590         proc = None
591         status = self.status(pid, ppid)
592
593         if status == sshfuncs.ProcStatus.RUNNING:
594             if self.localhost:
595                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
596             else:
597                 with self._node_lock:
598                     (out, err), proc = sshfuncs.rkill(
599                         pid, ppid,
600                         host = self.get("hostname"),
601                         user = self.get("username"),
602                         port = self.get("port"),
603                         agent = True,
604                         sudo = sudo,
605                         identity = self.get("identity"),
606                         server_key = self.get("serverKey")
607                         )
608
609         return (out, err), proc
610
611     def copy(self, src, dst):
612         if self.localhost:
613             (out, err), proc = execfuncs.lcopy(source, dest, 
614                     recursive = True,
615                     strict_host_checking = False)
616         else:
617             with self._node_lock:
618                 (out, err), proc = sshfuncs.rcopy(
619                     src, dst, 
620                     port = self.get("port"),
621                     identity = self.get("identity"),
622                     server_key = self.get("serverKey"),
623                     recursive = True,
624                     strict_host_checking = False)
625
626         return (out, err), proc
627
628
629     def upload(self, src, dst, text = False, overwrite = True):
630         """ Copy content to destination
631
632            src  content to copy. Can be a local file, directory or a list of files
633
634            dst  destination path on the remote host (remote is always self.host)
635
636            text src is text input, it must be stored into a temp file before uploading
637         """
638         # If source is a string input 
639         f = None
640         if text and not os.path.isfile(src):
641             # src is text input that should be uploaded as file
642             # create a temporal file with the content to upload
643             f = tempfile.NamedTemporaryFile(delete=False)
644             f.write(src)
645             f.close()
646             src = f.name
647
648         # If dst files should not be overwritten, check that the files do not
649         # exits already 
650         if overwrite == False:
651             src = self.filter_existing_files(src, dst)
652             if not src:
653                 return ("", ""), None 
654
655         if not self.localhost:
656             # Build destination as <user>@<server>:<path>
657             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
658
659         result = self.copy(src, dst)
660
661         # clean up temp file
662         if f:
663             os.remove(f.name)
664
665         return result
666
667     def download(self, src, dst):
668         if not self.localhost:
669             # Build destination as <user>@<server>:<path>
670             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
671         return self.copy(src, dst)
672
673     def install_packages_command(self, packages):
674         command = ""
675         if self.use_rpm:
676             command = rpmfuncs.install_packages_command(self.os, packages)
677         elif self.use_deb:
678             command = debfuncs.install_packages_command(self.os, packages)
679         else:
680             msg = "Error installing packages ( OS not known ) "
681             self.error(msg, self.os)
682             raise RuntimeError, msg
683
684         return command
685
686     def install_packages(self, packages, home, run_home = None):
687         """ Install packages in the Linux host.
688
689         'home' is the directory to upload the package installation script.
690         'run_home' is the directory from where to execute the script.
691         """
692         command = self.install_packages_command(packages)
693
694         run_home = run_home or home
695
696         (out, err), proc = self.run_and_wait(command, run_home, 
697             shfile = os.path.join(home, "instpkg.sh"),
698             pidfile = "instpkg_pidfile",
699             ecodefile = "instpkg_exitcode",
700             stdout = "instpkg_stdout", 
701             stderr = "instpkg_stderr",
702             overwrite = False,
703             raise_on_error = True)
704
705         return (out, err), proc 
706
707     def remove_packages(self, packages, home, run_home = None):
708         """ Uninstall packages from the Linux host.
709
710         'home' is the directory to upload the package un-installation script.
711         'run_home' is the directory from where to execute the script.
712         """
713         if self.use_rpm:
714             command = rpmfuncs.remove_packages_command(self.os, packages)
715         elif self.use_deb:
716             command = debfuncs.remove_packages_command(self.os, packages)
717         else:
718             msg = "Error removing packages ( OS not known ) "
719             self.error(msg)
720             raise RuntimeError, msg
721
722         run_home = run_home or home
723
724         (out, err), proc = self.run_and_wait(command, run_home, 
725             shfile = os.path.join(home, "rmpkg.sh"),
726             pidfile = "rmpkg_pidfile",
727             ecodefile = "rmpkg_exitcode",
728             stdout = "rmpkg_stdout", 
729             stderr = "rmpkg_stderr",
730             overwrite = False,
731             raise_on_error = True)
732          
733         return (out, err), proc 
734
735     def mkdir(self, path, clean = False):
736         if clean:
737             self.rmdir(path)
738
739         return self.execute("mkdir -p %s" % path, with_lock = True)
740
741     def rmdir(self, path):
742         return self.execute("rm -rf %s" % path, with_lock = True)
743         
744     def run_and_wait(self, command, home, 
745             shfile = "cmd.sh",
746             env = None,
747             overwrite = True,
748             pidfile = "pidfile", 
749             ecodefile = "exitcode", 
750             stdin = None, 
751             stdout = "stdout", 
752             stderr = "stderr", 
753             sudo = False,
754             tty = False,
755             raise_on_error = False):
756         """
757         Uploads the 'command' to a bash script in the host.
758         Then runs the script detached in background in the host, and
759         busy-waites until the script finishes executing.
760         """
761
762         if not shfile.startswith("/"):
763             shfile = os.path.join(home, shfile)
764
765         self.upload_command(command, 
766             shfile = shfile, 
767             ecodefile = ecodefile, 
768             env = env,
769             overwrite = overwrite)
770
771         command = "bash %s" % shfile
772         # run command in background in remote host
773         (out, err), proc = self.run(command, home, 
774                 pidfile = pidfile,
775                 stdin = stdin, 
776                 stdout = stdout, 
777                 stderr = stderr, 
778                 sudo = sudo,
779                 tty = tty)
780
781         # check no errors occurred
782         if proc.poll():
783             msg = " Failed to run command '%s' " % command
784             self.error(msg, out, err)
785             if raise_on_error:
786                 raise RuntimeError, msg
787
788         # Wait for pid file to be generated
789         pid, ppid = self.wait_pid(
790                 home = home, 
791                 pidfile = pidfile, 
792                 raise_on_error = raise_on_error)
793
794         # wait until command finishes to execute
795         self.wait_run(pid, ppid)
796       
797         (eout, err), proc = self.check_errors(home,
798             ecodefile = ecodefile,
799             stderr = stderr)
800
801         # Out is what was written in the stderr file
802         if err:
803             msg = " Failed to run command '%s' " % command
804             self.error(msg, eout, err)
805
806             if raise_on_error:
807                 raise RuntimeError, msg
808
809         (out, oerr), proc = self.check_output(home, stdout)
810         
811         return (out, err), proc
812
813     def exitcode(self, home, ecodefile = "exitcode"):
814         """
815         Get the exit code of an application.
816         Returns an integer value with the exit code 
817         """
818         (out, err), proc = self.check_output(home, ecodefile)
819
820         # Succeeded to open file, return exit code in the file
821         if proc.wait() == 0:
822             try:
823                 return int(out.strip())
824             except:
825                 # Error in the content of the file!
826                 return ExitCode.CORRUPTFILE
827
828         # No such file or directory
829         if proc.returncode == 1:
830             return ExitCode.FILENOTFOUND
831         
832         # Other error from 'cat'
833         return ExitCode.ERROR
834
835     def upload_command(self, command, 
836             shfile = "cmd.sh",
837             ecodefile = "exitcode",
838             overwrite = True,
839             env = None):
840         """ Saves the command as a bash script file in the remote host, and
841         forces to save the exit code of the command execution to the ecodefile
842         """
843
844         if not (command.strip().endswith(";") or command.strip().endswith("&")):
845             command += ";"
846       
847         # The exit code of the command will be stored in ecodefile
848         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
849                 'command': command,
850                 'ecodefile': ecodefile,
851                 } 
852
853         # Export environment
854         environ = self.format_environment(env)
855
856         # Add environ to command
857         command = environ + command
858
859         return self.upload(command, shfile, text = True, overwrite = overwrite)
860
861     def format_environment(self, env, inline = False):
862         """ Formats the environment variables for a command to be executed
863         either as an inline command
864         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
865         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
866         """
867         if not env: return ""
868
869         # Remove extra white spaces
870         env = re.sub(r'\s+', ' ', env.strip())
871
872         sep = ";" if inline else "\n"
873         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
874
875     def check_errors(self, home, 
876             ecodefile = "exitcode", 
877             stderr = "stderr"):
878         """ Checks whether errors occurred while running a command.
879         It first checks the exit code for the command, and only if the
880         exit code is an error one it returns the error output.
881
882         """
883         proc = None
884         err = ""
885
886         # get exit code saved in the 'exitcode' file
887         ecode = self.exitcode(home, ecodefile)
888
889         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
890             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
891         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
892             # The process returned an error code or didn't exist. 
893             # Check standard error.
894             (err, eerr), proc = self.check_output(home, stderr)
895
896             # If the stderr file was not found, assume nothing bad happened,
897             # and just ignore the error.
898             # (cat returns 1 for error "No such file or directory")
899             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
900                 err = "" 
901             
902         return ("", err), proc
903  
904     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
905         """ Waits until the pid file for the command is generated, 
906             and returns the pid and ppid of the process """
907         pid = ppid = None
908         delay = 1.0
909
910         for i in xrange(4):
911             pidtuple = self.getpid(home = home, pidfile = pidfile)
912             
913             if pidtuple:
914                 pid, ppid = pidtuple
915                 break
916             else:
917                 time.sleep(delay)
918                 delay = delay * 1.5
919         else:
920             msg = " Failed to get pid for pidfile %s/%s " % (
921                     home, pidfile )
922             self.error(msg)
923             
924             if raise_on_error:
925                 raise RuntimeError, msg
926
927         return pid, ppid
928
929     def wait_run(self, pid, ppid, trial = 0):
930         """ wait for a remote process to finish execution """
931         delay = 1.0
932
933         while True:
934             status = self.status(pid, ppid)
935             
936             if status is ProcStatus.FINISHED:
937                 break
938             elif status is not ProcStatus.RUNNING:
939                 delay = delay * 1.5
940                 time.sleep(delay)
941                 # If it takes more than 20 seconds to start, then
942                 # asume something went wrong
943                 if delay > 20:
944                     break
945             else:
946                 # The app is running, just wait...
947                 time.sleep(0.5)
948
949     def check_output(self, home, filename):
950         """ Retrives content of file """
951         (out, err), proc = self.execute("cat %s" % 
952             os.path.join(home, filename), retry = 1, with_lock = True)
953         return (out, err), proc
954
955     def is_alive(self):
956         """ Checks if host is responsive
957         """
958         if self.localhost:
959             return True
960
961         out = err = ""
962         # The underlying SSH layer will sometimes return an empty
963         # output (even if the command was executed without errors).
964         # To work arround this, repeat the operation N times or
965         # until the result is not empty string
966         retrydelay = 1.0
967         for i in xrange(10):
968             try:
969                 (out, err), proc = self.execute("echo 'ALIVE'",
970                         retry = 5,
971                         blocking = True,
972                         with_lock = True)
973         
974                 if out.find("ALIVE") > -1:
975                     return True
976             except:
977                 trace = traceback.format_exc()
978                 msg = "Unresponsive host. Error reaching host: %s " % trace
979                 self.error(msg, out, err)
980                 return False
981
982             time.sleep(min(30.0, retrydelay))
983             retrydelay *= 1.5
984
985         if out.find("ALIVE") > -1:
986             return True
987         else:
988             msg = "Unresponsive host. Wrong answer. "
989             self.error(msg, out, err)
990             return False
991
992     def find_home(self):
993         """ Retrieves host home directory
994         """
995         # The underlying SSH layer will sometimes return an empty
996         # output (even if the command was executed without errors).
997         # To work arround this, repeat the operation N times or
998         # until the result is not empty string
999         retrydelay = 1.0
1000         for i in xrange(10):
1001             try:
1002                 (out, err), proc = self.execute("echo ${HOME}",
1003                         retry = 5,
1004                         blocking = True,
1005                         with_lock = True)
1006         
1007                 if out.strip() != "":
1008                     self._home_dir =  out.strip()
1009                     break
1010             except:
1011                 trace = traceback.format_exc()
1012                 msg = "Impossible to retrieve HOME directory" % trace
1013                 self.error(msg, out, err)
1014                 return False
1015
1016             time.sleep(min(30.0, retrydelay))
1017             retrydelay *= 1.5
1018
1019         if not self._home_dir:
1020             msg = "Impossible to retrieve HOME directory"
1021             self.error(msg, out, err)
1022             raise RuntimeError, msg
1023
1024     def filter_existing_files(self, src, dst):
1025         """ Removes files that already exist in the Linux host from src list
1026         """
1027         # construct a dictionary with { dst: src }
1028         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
1029             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1030
1031         command = []
1032         for d in dests.keys():
1033             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1034
1035         command = ";".join(command)
1036
1037         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1038     
1039         for d in dests.keys():
1040             if out.find(d) > -1:
1041                 del dests[d]
1042
1043         if not dests:
1044             return ""
1045
1046         return " ".join(dests.values())
1047