Fixing issues in LinuxNode whith high concurrency
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
22         reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145
146     @classmethod
147     def _register_attributes(cls):
148         hostname = Attribute("hostname", "Hostname of the machine",
149                 flags = Flags.ExecReadOnly)
150
151         username = Attribute("username", "Local account username", 
152                 flags = Flags.Credential)
153
154         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
155         
156         home = Attribute("home",
157                 "Experiment home directory to store all experiment related files",
158                 flags = Flags.ExecReadOnly)
159         
160         identity = Attribute("identity", "SSH identity file",
161                 flags = Flags.Credential)
162         
163         server_key = Attribute("serverKey", "Server public key", 
164                 flags = Flags.ExecReadOnly)
165         
166         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
167                 " from node home folder before starting experiment", 
168                 flags = Flags.ExecReadOnly)
169
170         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
171                 " from a previous same experiment, before the new experiment starts", 
172                 flags = Flags.ExecReadOnly)
173         
174         clean_processes = Attribute("cleanProcesses", 
175                 "Kill all running processes before starting experiment",
176                 flags = Flags.ExecReadOnly)
177         
178         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
179                 "releasing the resource",
180                 flags = Flags.ExecReadOnly)
181
182         cls._register_attribute(hostname)
183         cls._register_attribute(username)
184         cls._register_attribute(port)
185         cls._register_attribute(home)
186         cls._register_attribute(identity)
187         cls._register_attribute(server_key)
188         cls._register_attribute(clean_home)
189         cls._register_attribute(clean_experiment)
190         cls._register_attribute(clean_processes)
191         cls._register_attribute(tear_down)
192
193     def __init__(self, ec, guid):
194         super(LinuxNode, self).__init__(ec, guid)
195         self._os = None
196         # home directory at Linux host
197         self._home_dir = ""
198         
199         # lock to avoid concurrency issues on methods used by applications 
200         self._lock = threading.Lock()
201     
202     def log_message(self, msg):
203         return " guid %d - host %s - %s " % (self.guid, 
204                 self.get("hostname"), msg)
205
206     @property
207     def home_dir(self):
208         home = self.get("home") or ""
209         if not home.startswith("/"):
210            home = os.path.join(self._home_dir, home) 
211         return home
212
213     @property
214     def usr_dir(self):
215         return os.path.join(self.home_dir, "nepi-usr")
216
217     @property
218     def lib_dir(self):
219         return os.path.join(self.usr_dir, "lib")
220
221     @property
222     def bin_dir(self):
223         return os.path.join(self.usr_dir, "bin")
224
225     @property
226     def src_dir(self):
227         return os.path.join(self.usr_dir, "src")
228
229     @property
230     def share_dir(self):
231         return os.path.join(self.usr_dir, "share")
232
233     @property
234     def exp_dir(self):
235         return os.path.join(self.home_dir, "nepi-exp")
236
237     @property
238     def exp_home(self):
239         return os.path.join(self.exp_dir, self.ec.exp_id)
240
241     @property
242     def node_home(self):
243         return os.path.join(self.exp_home, "node-%d" % self.guid)
244
245     @property
246     def run_home(self):
247         return os.path.join(self.node_home, self.ec.run_id)
248
249     @property
250     def os(self):
251         if self._os:
252             return self._os
253
254         if (not self.get("hostname") or not self.get("username")):
255             msg = "Can't resolve OS, insufficient data "
256             self.error(msg)
257             raise RuntimeError, msg
258
259         out = self.get_os()
260
261         if out.find("Fedora release 8") == 0:
262             self._os = OSType.FEDORA_8
263         elif out.find("Fedora release 12") == 0:
264             self._os = OSType.FEDORA_12
265         elif out.find("Fedora release 14") == 0:
266             self._os = OSType.FEDORA_14
267         elif out.find("Debian") == 0: 
268             self._os = OSType.DEBIAN
269         elif out.find("Ubuntu") ==0:
270             self._os = OSType.UBUNTU
271         else:
272             msg = "Unsupported OS"
273             self.error(msg, out)
274             raise RuntimeError, "%s - %s " %( msg, out )
275
276         return self._os
277
278     def get_os(self):
279         # The underlying SSH layer will sometimes return an empty
280         # output (even if the command was executed without errors).
281         # To work arround this, repeat the operation N times or
282         # until the result is not empty string
283         out = ""
284         retrydelay = 1.0
285         for i in xrange(10):
286             try:
287                 (out, err), proc = self.execute("cat /etc/issue", 
288                         retry = 5,
289                         with_lock = True,
290                         blocking = True)
291
292                 if out.strip() != "":
293                     return out
294             except:
295                 trace = traceback.format_exc()
296                 msg = "Error detecting OS: %s " % trace
297                 self.error(msg, out, err)
298                 return False
299
300             time.sleep(min(30.0, retrydelay))
301             retrydelay *= 1.5
302
303
304     @property
305     def use_deb(self):
306         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
307
308     @property
309     def use_rpm(self):
310         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
311                 OSType.FEDORA]
312
313     @property
314     def localhost(self):
315         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
316
317     def provision(self):
318         # check if host is alive
319         if not self.is_alive():
320             self.fail()
321             
322             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
323             self.error(msg)
324             raise RuntimeError, msg
325
326         self.find_home()
327
328         if self.get("cleanProcesses"):
329             self.clean_processes()
330
331         if self.get("cleanHome"):
332             self.clean_home()
333  
334         if self.get("cleanExperiment"):
335             self.clean_experiment()
336     
337         # Create shared directory structure
338         self.mkdir(self.lib_dir)
339         self.mkdir(self.bin_dir)
340         self.mkdir(self.src_dir)
341         self.mkdir(self.share_dir)
342
343         # Create experiment node home directory
344         self.mkdir(self.node_home)
345
346         super(LinuxNode, self).provision()
347
348     def deploy(self):
349         if self.state == ResourceState.NEW:
350             try:
351                 self.discover()
352                 self.provision()
353             except:
354                 self._state = ResourceState.FAILED
355                 raise
356
357         # Node needs to wait until all associated interfaces are 
358         # ready before it can finalize deployment
359         from nepi.resources.linux.interface import LinuxInterface
360         ifaces = self.get_connected(LinuxInterface.rtype())
361         for iface in ifaces:
362             if iface.state < ResourceState.READY:
363                 self.ec.schedule(reschedule_delay, self.deploy)
364                 return 
365
366         super(LinuxNode, self).deploy()
367
368     def release(self):
369         # Node needs to wait until all associated RMs are released
370         # to be released
371         rms = self.get_connected()
372         for rm in rms:
373             if rm.state < ResourceState.STOPPED:
374                 self.ec.schedule(reschedule_delay, self.release)
375                 return 
376
377         tear_down = self.get("tearDown")
378         if tear_down:
379             self.execute(tear_down)
380
381         self.clean_processes()
382
383         super(LinuxNode, self).release()
384
385     def valid_connection(self, guid):
386         # TODO: Validate!
387         return True
388
389     def clean_processes(self, killer = False):
390         self.info("Cleaning up processes")
391         
392         if killer:
393             # Hardcore kill
394             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
395                 "sudo -S killall python tcpdump || /bin/true ; " +
396                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
397                 "sudo -S killall -u root || /bin/true ; " +
398                 "sudo -S killall -u root || /bin/true ; ")
399         else:
400             # Be gentler...
401             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
402                 "sudo -S killall tcpdump || /bin/true ; " +
403                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
404                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
405
406         out = err = ""
407         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
408             
409     def clean_home(self):
410         """ Cleans all NEPI related folders in the Linux host
411         """
412         self.info("Cleaning up home")
413         
414         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
415                 self.home_dir )
416
417         return self.execute(cmd, with_lock = True)
418
419     def clean_experiment(self):
420         """ Cleans all experiment related files in the Linux host.
421         It preserves NEPI files and folders that have a multi experiment
422         scope.
423         """
424         self.info("Cleaning up experiment files")
425         
426         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
427                 self.exp_dir,
428                 self.ec.exp_id )
429             
430         return self.execute(cmd, with_lock = True)
431
432     def execute(self, command,
433             sudo = False,
434             stdin = None, 
435             env = None,
436             tty = False,
437             forward_x11 = False,
438             timeout = None,
439             retry = 3,
440             err_on_timeout = True,
441             connect_timeout = 30,
442             strict_host_checking = False,
443             persistent = True,
444             blocking = True,
445             with_lock = False
446             ):
447         """ Notice that this invocation will block until the
448         execution finishes. If this is not the desired behavior,
449         use 'run' instead."""
450
451         if self.localhost:
452             (out, err), proc = execfuncs.lexec(command, 
453                     user = user,
454                     sudo = sudo,
455                     stdin = stdin,
456                     env = env)
457         else:
458             if with_lock:
459                 with self._lock:
460                     (out, err), proc = sshfuncs.rexec(
461                         command, 
462                         host = self.get("hostname"),
463                         user = self.get("username"),
464                         port = self.get("port"),
465                         agent = True,
466                         sudo = sudo,
467                         stdin = stdin,
468                         identity = self.get("identity"),
469                         server_key = self.get("serverKey"),
470                         env = env,
471                         tty = tty,
472                         forward_x11 = forward_x11,
473                         timeout = timeout,
474                         retry = retry,
475                         err_on_timeout = err_on_timeout,
476                         connect_timeout = connect_timeout,
477                         persistent = persistent,
478                         blocking = blocking, 
479                         strict_host_checking = strict_host_checking
480                         )
481             else:
482                 (out, err), proc = sshfuncs.rexec(
483                     command, 
484                     host = self.get("hostname"),
485                     user = self.get("username"),
486                     port = self.get("port"),
487                     agent = True,
488                     sudo = sudo,
489                     stdin = stdin,
490                     identity = self.get("identity"),
491                     server_key = self.get("serverKey"),
492                     env = env,
493                     tty = tty,
494                     forward_x11 = forward_x11,
495                     timeout = timeout,
496                     retry = retry,
497                     err_on_timeout = err_on_timeout,
498                     connect_timeout = connect_timeout,
499                     persistent = persistent,
500                     blocking = blocking, 
501                     strict_host_checking = strict_host_checking
502                     )
503
504         return (out, err), proc
505
506     def run(self, command, home,
507             create_home = False,
508             pidfile = 'pidfile',
509             stdin = None, 
510             stdout = 'stdout', 
511             stderr = 'stderr', 
512             sudo = False,
513             tty = False):
514         
515         self.debug("Running command '%s'" % command)
516         
517         if self.localhost:
518             (out, err), proc = execfuncs.lspawn(command, pidfile, 
519                     stdout = stdout, 
520                     stderr = stderr, 
521                     stdin = stdin, 
522                     home = home, 
523                     create_home = create_home, 
524                     sudo = sudo,
525                     user = user) 
526         else:
527             with self._lock:
528                 (out, err), proc = sshfuncs.rspawn(
529                     command,
530                     pidfile = pidfile,
531                     home = home,
532                     create_home = create_home,
533                     stdin = stdin if stdin is not None else '/dev/null',
534                     stdout = stdout if stdout else '/dev/null',
535                     stderr = stderr if stderr else '/dev/null',
536                     sudo = sudo,
537                     host = self.get("hostname"),
538                     user = self.get("username"),
539                     port = self.get("port"),
540                     agent = True,
541                     identity = self.get("identity"),
542                     server_key = self.get("serverKey"),
543                     tty = tty
544                     )
545
546         return (out, err), proc
547
548     def getpid(self, home, pidfile = "pidfile"):
549         if self.localhost:
550             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
551         else:
552             with self._lock:
553                 pidtuple = sshfuncs.rgetpid(
554                     os.path.join(home, pidfile),
555                     host = self.get("hostname"),
556                     user = self.get("username"),
557                     port = self.get("port"),
558                     agent = True,
559                     identity = self.get("identity"),
560                     server_key = self.get("serverKey")
561                     )
562         
563         return pidtuple
564
565     def status(self, pid, ppid):
566         if self.localhost:
567             status = execfuncs.lstatus(pid, ppid)
568         else:
569             with self._lock:
570                 status = sshfuncs.rstatus(
571                         pid, ppid,
572                         host = self.get("hostname"),
573                         user = self.get("username"),
574                         port = self.get("port"),
575                         agent = True,
576                         identity = self.get("identity"),
577                         server_key = self.get("serverKey")
578                         )
579            
580         return status
581     
582     def kill(self, pid, ppid, sudo = False):
583         out = err = ""
584         proc = None
585         status = self.status(pid, ppid)
586
587         if status == sshfuncs.ProcStatus.RUNNING:
588             if self.localhost:
589                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
590             else:
591                 with self._lock:
592                     (out, err), proc = sshfuncs.rkill(
593                         pid, ppid,
594                         host = self.get("hostname"),
595                         user = self.get("username"),
596                         port = self.get("port"),
597                         agent = True,
598                         sudo = sudo,
599                         identity = self.get("identity"),
600                         server_key = self.get("serverKey")
601                         )
602
603         return (out, err), proc
604
605     def copy(self, src, dst):
606         if self.localhost:
607             (out, err), proc = execfuncs.lcopy(source, dest, 
608                     recursive = True,
609                     strict_host_checking = False)
610         else:
611             with self._lock:
612                 (out, err), proc = sshfuncs.rcopy(
613                     src, dst, 
614                     port = self.get("port"),
615                     identity = self.get("identity"),
616                     server_key = self.get("serverKey"),
617                     recursive = True,
618                     strict_host_checking = False)
619
620         return (out, err), proc
621
622
623     def upload(self, src, dst, text = False, overwrite = True):
624         """ Copy content to destination
625
626            src  content to copy. Can be a local file, directory or a list of files
627
628            dst  destination path on the remote host (remote is always self.host)
629
630            text src is text input, it must be stored into a temp file before uploading
631         """
632         # If source is a string input 
633         f = None
634         if text and not os.path.isfile(src):
635             # src is text input that should be uploaded as file
636             # create a temporal file with the content to upload
637             f = tempfile.NamedTemporaryFile(delete=False)
638             f.write(src)
639             f.close()
640             src = f.name
641
642         # If dst files should not be overwritten, check that the files do not
643         # exits already 
644         if overwrite == False:
645             src = self.filter_existing_files(src, dst)
646             if not src:
647                 return ("", ""), None 
648
649         if not self.localhost:
650             # Build destination as <user>@<server>:<path>
651             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
652
653         result = self.copy(src, dst)
654
655         # clean up temp file
656         if f:
657             os.remove(f.name)
658
659         return result
660
661     def download(self, src, dst):
662         if not self.localhost:
663             # Build destination as <user>@<server>:<path>
664             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
665         return self.copy(src, dst)
666
667     def install_packages_command(self, packages):
668         command = ""
669         if self.use_rpm:
670             command = rpmfuncs.install_packages_command(self.os, packages)
671         elif self.use_deb:
672             command = debfuncs.install_packages_command(self.os, packages)
673         else:
674             msg = "Error installing packages ( OS not known ) "
675             self.error(msg, self.os)
676             raise RuntimeError, msg
677
678         return command
679
680     def install_packages(self, packages, home, run_home = None):
681         """ Install packages in the Linux host.
682
683         'home' is the directory to upload the package installation script.
684         'run_home' is the directory from where to execute the script.
685         """
686         command = self.install_packages_command(packages)
687
688         run_home = run_home or home
689
690         (out, err), proc = self.run_and_wait(command, run_home, 
691             shfile = os.path.join(home, "instpkg.sh"),
692             pidfile = "instpkg_pidfile",
693             ecodefile = "instpkg_exitcode",
694             stdout = "instpkg_stdout", 
695             stderr = "instpkg_stderr",
696             overwrite = False,
697             raise_on_error = True)
698
699         return (out, err), proc 
700
701     def remove_packages(self, packages, home, run_home = None):
702         """ Uninstall packages from the Linux host.
703
704         'home' is the directory to upload the package un-installation script.
705         'run_home' is the directory from where to execute the script.
706         """
707         if self.use_rpm:
708             command = rpmfuncs.remove_packages_command(self.os, packages)
709         elif self.use_deb:
710             command = debfuncs.remove_packages_command(self.os, packages)
711         else:
712             msg = "Error removing packages ( OS not known ) "
713             self.error(msg)
714             raise RuntimeError, msg
715
716         run_home = run_home or home
717
718         (out, err), proc = self.run_and_wait(command, run_home, 
719             shfile = os.path.join(home, "rmpkg.sh"),
720             pidfile = "rmpkg_pidfile",
721             ecodefile = "rmpkg_exitcode",
722             stdout = "rmpkg_stdout", 
723             stderr = "rmpkg_stderr",
724             overwrite = False,
725             raise_on_error = True)
726          
727         return (out, err), proc 
728
729     def mkdir(self, path, clean = False):
730         if clean:
731             self.rmdir(path)
732
733         return self.execute("mkdir -p %s" % path, with_lock = True)
734
735     def rmdir(self, path):
736         return self.execute("rm -rf %s" % path, with_lock = True)
737         
738     def run_and_wait(self, command, home, 
739             shfile = "cmd.sh",
740             env = None,
741             overwrite = True,
742             pidfile = "pidfile", 
743             ecodefile = "exitcode", 
744             stdin = None, 
745             stdout = "stdout", 
746             stderr = "stderr", 
747             sudo = False,
748             tty = False,
749             raise_on_error = False):
750         """
751         Uploads the 'command' to a bash script in the host.
752         Then runs the script detached in background in the host, and
753         busy-waites until the script finishes executing.
754         """
755
756         if not shfile.startswith("/"):
757             shfile = os.path.join(home, shfile)
758
759         self.upload_command(command, 
760             shfile = shfile, 
761             ecodefile = ecodefile, 
762             env = env,
763             overwrite = overwrite)
764
765         command = "bash %s" % shfile
766         # run command in background in remote host
767         (out, err), proc = self.run(command, home, 
768                 pidfile = pidfile,
769                 stdin = stdin, 
770                 stdout = stdout, 
771                 stderr = stderr, 
772                 sudo = sudo,
773                 tty = tty)
774
775         # check no errors occurred
776         if proc.poll():
777             msg = " Failed to run command '%s' " % command
778             self.error(msg, out, err)
779             if raise_on_error:
780                 raise RuntimeError, msg
781
782         # Wait for pid file to be generated
783         pid, ppid = self.wait_pid(
784                 home = home, 
785                 pidfile = pidfile, 
786                 raise_on_error = raise_on_error)
787
788         # wait until command finishes to execute
789         self.wait_run(pid, ppid)
790       
791         (eout, err), proc = self.check_errors(home,
792             ecodefile = ecodefile,
793             stderr = stderr)
794
795         # Out is what was written in the stderr file
796         if err:
797             msg = " Failed to run command '%s' " % command
798             self.error(msg, eout, err)
799
800             if raise_on_error:
801                 raise RuntimeError, msg
802
803         (out, oerr), proc = self.check_output(home, stdout)
804         
805         return (out, err), proc
806
807     def exitcode(self, home, ecodefile = "exitcode"):
808         """
809         Get the exit code of an application.
810         Returns an integer value with the exit code 
811         """
812         (out, err), proc = self.check_output(home, ecodefile)
813
814         # Succeeded to open file, return exit code in the file
815         if proc.wait() == 0:
816             try:
817                 return int(out.strip())
818             except:
819                 # Error in the content of the file!
820                 return ExitCode.CORRUPTFILE
821
822         # No such file or directory
823         if proc.returncode == 1:
824             return ExitCode.FILENOTFOUND
825         
826         # Other error from 'cat'
827         return ExitCode.ERROR
828
829     def upload_command(self, command, 
830             shfile = "cmd.sh",
831             ecodefile = "exitcode",
832             overwrite = True,
833             env = None):
834         """ Saves the command as a bash script file in the remote host, and
835         forces to save the exit code of the command execution to the ecodefile
836         """
837
838         if not (command.strip().endswith(";") or command.strip().endswith("&")):
839             command += ";"
840       
841         # The exit code of the command will be stored in ecodefile
842         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
843                 'command': command,
844                 'ecodefile': ecodefile,
845                 } 
846
847         # Export environment
848         environ = self.format_environment(env)
849
850         # Add environ to command
851         command = environ + command
852
853         return self.upload(command, shfile, text = True, overwrite = overwrite)
854
855     def format_environment(self, env, inline = False):
856         """ Formats the environment variables for a command to be executed
857         either as an inline command
858         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
859         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
860         """
861         if not env: return ""
862
863         # Remove extra white spaces
864         env = re.sub(r'\s+', ' ', env.strip())
865
866         sep = ";" if inline else "\n"
867         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
868
869     def check_errors(self, home, 
870             ecodefile = "exitcode", 
871             stderr = "stderr"):
872         """ Checks whether errors occurred while running a command.
873         It first checks the exit code for the command, and only if the
874         exit code is an error one it returns the error output.
875
876         """
877         proc = None
878         err = ""
879
880         # get exit code saved in the 'exitcode' file
881         ecode = self.exitcode(home, ecodefile)
882
883         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
884             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
885         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
886             # The process returned an error code or didn't exist. 
887             # Check standard error.
888             (err, eerr), proc = self.check_output(home, stderr)
889
890             # If the stderr file was not found, assume nothing bad happened,
891             # and just ignore the error.
892             # (cat returns 1 for error "No such file or directory")
893             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
894                 err = "" 
895             
896         return ("", err), proc
897  
898     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
899         """ Waits until the pid file for the command is generated, 
900             and returns the pid and ppid of the process """
901         pid = ppid = None
902         delay = 1.0
903
904         for i in xrange(4):
905             pidtuple = self.getpid(home = home, pidfile = pidfile)
906             
907             if pidtuple:
908                 pid, ppid = pidtuple
909                 break
910             else:
911                 time.sleep(delay)
912                 delay = delay * 1.5
913         else:
914             msg = " Failed to get pid for pidfile %s/%s " % (
915                     home, pidfile )
916             self.error(msg)
917             
918             if raise_on_error:
919                 raise RuntimeError, msg
920
921         return pid, ppid
922
923     def wait_run(self, pid, ppid, trial = 0):
924         """ wait for a remote process to finish execution """
925         delay = 1.0
926
927         while True:
928             status = self.status(pid, ppid)
929             
930             if status is ProcStatus.FINISHED:
931                 break
932             elif status is not ProcStatus.RUNNING:
933                 delay = delay * 1.5
934                 time.sleep(delay)
935                 # If it takes more than 20 seconds to start, then
936                 # asume something went wrong
937                 if delay > 20:
938                     break
939             else:
940                 # The app is running, just wait...
941                 time.sleep(0.5)
942
943     def check_output(self, home, filename):
944         """ Retrives content of file """
945         (out, err), proc = self.execute("cat %s" % 
946             os.path.join(home, filename), retry = 1, with_lock = True)
947         return (out, err), proc
948
949     def is_alive(self):
950         """ Checks if host is responsive
951         """
952         if self.localhost:
953             return True
954
955         out = err = ""
956         # The underlying SSH layer will sometimes return an empty
957         # output (even if the command was executed without errors).
958         # To work arround this, repeat the operation N times or
959         # until the result is not empty string
960         retrydelay = 1.0
961         for i in xrange(10):
962             try:
963                 (out, err), proc = self.execute("echo 'ALIVE'",
964                         retry = 5,
965                         blocking = True,
966                         with_lock = True)
967         
968                 if out.find("ALIVE") > -1:
969                     return True
970             except:
971                 trace = traceback.format_exc()
972                 msg = "Unresponsive host. Error reaching host: %s " % trace
973                 self.error(msg, out, err)
974                 return False
975
976             time.sleep(min(30.0, retrydelay))
977             retrydelay *= 1.5
978
979         if out.find("ALIVE") > -1:
980             return True
981         else:
982             msg = "Unresponsive host. Wrong answer. "
983             self.error(msg, out, err)
984             return False
985
986     def find_home(self):
987         """ Retrieves host home directory
988         """
989         # The underlying SSH layer will sometimes return an empty
990         # output (even if the command was executed without errors).
991         # To work arround this, repeat the operation N times or
992         # until the result is not empty string
993         retrydelay = 1.0
994         for i in xrange(10):
995             try:
996                 (out, err), proc = self.execute("echo ${HOME}",
997                         retry = 5,
998                         blocking = True,
999                         with_lock = True)
1000         
1001                 if out.strip() != "":
1002                     self._home_dir =  out.strip()
1003                     break
1004             except:
1005                 trace = traceback.format_exc()
1006                 msg = "Impossible to retrieve HOME directory" % trace
1007                 self.error(msg, out, err)
1008                 return False
1009
1010             time.sleep(min(30.0, retrydelay))
1011             retrydelay *= 1.5
1012
1013         if not self._home_dir:
1014             msg = "Impossible to retrieve HOME directory"
1015             self.error(msg, out, err)
1016             raise RuntimeError, msg
1017
1018     def filter_existing_files(self, src, dst):
1019         """ Removes files that already exist in the Linux host from src list
1020         """
1021         # construct a dictionary with { dst: src }
1022         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
1023             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1024
1025         command = []
1026         for d in dests.keys():
1027             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1028
1029         command = ";".join(command)
1030
1031         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1032     
1033         for d in dests.keys():
1034             if out.find(d) > -1:
1035                 del dests[d]
1036
1037         if not dests:
1038             return ""
1039
1040         return " ".join(dests.values())
1041