Replacing RM.rtype() for RM.get_type() for consistency
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.ExecReadOnly)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.ExecReadOnly)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.ExecReadOnly)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.ExecReadOnly)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.ExecReadOnly)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.ExecReadOnly)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.ExecReadOnly)
190
191         cls._register_attribute(hostname)
192         cls._register_attribute(username)
193         cls._register_attribute(port)
194         cls._register_attribute(home)
195         cls._register_attribute(identity)
196         cls._register_attribute(server_key)
197         cls._register_attribute(clean_home)
198         cls._register_attribute(clean_experiment)
199         cls._register_attribute(clean_processes)
200         cls._register_attribute(tear_down)
201
202     def __init__(self, ec, guid):
203         super(LinuxNode, self).__init__(ec, guid)
204         self._os = None
205         # home directory at Linux host
206         self._home_dir = ""
207         
208         # lock to prevent concurrent applications on the same node,
209         # to execute commands at the same time. There are potential
210         # concurrency issues when using SSH to a same host from 
211         # multiple threads. There are also possible operational 
212         # issues, e.g. an application querying the existence 
213         # of a file or folder prior to its creation, and another 
214         # application creating the same file or folder in between.
215         self._node_lock = threading.Lock()
216     
217     def log_message(self, msg):
218         return " guid %d - host %s - %s " % (self.guid, 
219                 self.get("hostname"), msg)
220
221     @property
222     def home_dir(self):
223         home = self.get("home") or ""
224         if not home.startswith("/"):
225            home = os.path.join(self._home_dir, home) 
226         return home
227
228     @property
229     def usr_dir(self):
230         return os.path.join(self.home_dir, "nepi-usr")
231
232     @property
233     def lib_dir(self):
234         return os.path.join(self.usr_dir, "lib")
235
236     @property
237     def bin_dir(self):
238         return os.path.join(self.usr_dir, "bin")
239
240     @property
241     def src_dir(self):
242         return os.path.join(self.usr_dir, "src")
243
244     @property
245     def share_dir(self):
246         return os.path.join(self.usr_dir, "share")
247
248     @property
249     def exp_dir(self):
250         return os.path.join(self.home_dir, "nepi-exp")
251
252     @property
253     def exp_home(self):
254         return os.path.join(self.exp_dir, self.ec.exp_id)
255
256     @property
257     def node_home(self):
258         return os.path.join(self.exp_home, "node-%d" % self.guid)
259
260     @property
261     def run_home(self):
262         return os.path.join(self.node_home, self.ec.run_id)
263
264     @property
265     def os(self):
266         if self._os:
267             return self._os
268
269         if (not self.get("hostname") or not self.get("username")):
270             msg = "Can't resolve OS, insufficient data "
271             self.error(msg)
272             raise RuntimeError, msg
273
274         out = self.get_os()
275
276         if out.find("Fedora release 8") == 0:
277             self._os = OSType.FEDORA_8
278         elif out.find("Fedora release 12") == 0:
279             self._os = OSType.FEDORA_12
280         elif out.find("Fedora release 14") == 0:
281             self._os = OSType.FEDORA_14
282         elif out.find("Fedora release") == 0:
283             self._os = OSType.FEDORA
284         elif out.find("Debian") == 0: 
285             self._os = OSType.DEBIAN
286         elif out.find("Ubuntu") ==0:
287             self._os = OSType.UBUNTU
288         else:
289             msg = "Unsupported OS"
290             self.error(msg, out)
291             raise RuntimeError, "%s - %s " %( msg, out )
292
293         return self._os
294
295     def get_os(self):
296         # The underlying SSH layer will sometimes return an empty
297         # output (even if the command was executed without errors).
298         # To work arround this, repeat the operation N times or
299         # until the result is not empty string
300         out = ""
301         try:
302             (out, err), proc = self.execute("cat /etc/issue", 
303                     with_lock = True,
304                     blocking = True)
305         except:
306             trace = traceback.format_exc()
307             msg = "Error detecting OS: %s " % trace
308             self.error(msg, out, err)
309         
310         return out
311
312     @property
313     def use_deb(self):
314         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
315
316     @property
317     def use_rpm(self):
318         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
319                 OSType.FEDORA]
320
321     @property
322     def localhost(self):
323         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
324
325     def do_provision(self):
326         # check if host is alive
327         if not self.is_alive():
328             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
329             self.error(msg)
330             raise RuntimeError, msg
331
332         self.find_home()
333
334         if self.get("cleanProcesses"):
335             self.clean_processes()
336
337         if self.get("cleanHome"):
338             self.clean_home()
339  
340         if self.get("cleanExperiment"):
341             self.clean_experiment()
342     
343         # Create shared directory structure
344         self.mkdir(self.lib_dir)
345         self.mkdir(self.bin_dir)
346         self.mkdir(self.src_dir)
347         self.mkdir(self.share_dir)
348
349         # Create experiment node home directory
350         self.mkdir(self.node_home)
351
352         super(LinuxNode, self).do_provision()
353
354     def do_deploy(self):
355         if self.state == ResourceState.NEW:
356             self.info("Deploying node")
357             self.do_discover()
358             self.do_provision()
359
360         # Node needs to wait until all associated interfaces are 
361         # ready before it can finalize deployment
362         from nepi.resources.linux.interface import LinuxInterface
363         ifaces = self.get_connected(LinuxInterface.get_rtype())
364         for iface in ifaces:
365             if iface.state < ResourceState.READY:
366                 self.ec.schedule(reschedule_delay, self.deploy)
367                 return 
368
369         super(LinuxNode, self).do_deploy()
370
371     def do_release(self):
372         rms = self.get_connected()
373         for rm in rms:
374             # Node needs to wait until all associated RMs are released
375             # before it can be released
376             if rm.state != ResourceState.RELEASED:
377                 self.ec.schedule(reschedule_delay, self.release)
378                 return 
379
380         tear_down = self.get("tearDown")
381         if tear_down:
382             self.execute(tear_down)
383
384         self.clean_processes()
385
386         super(LinuxNode, self).do_release()
387
388     def valid_connection(self, guid):
389         # TODO: Validate!
390         return True
391
392     def clean_processes(self, killer = False):
393         self.info("Cleaning up processes")
394         
395         if killer:
396             # Hardcore kill
397             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
398                 "sudo -S killall python tcpdump || /bin/true ; " +
399                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
400                 "sudo -S killall -u root || /bin/true ; " +
401                 "sudo -S killall -u root || /bin/true ; ")
402         else:
403             # Be gentler...
404             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
405                 "sudo -S killall tcpdump || /bin/true ; " +
406                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
407                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
408
409         out = err = ""
410         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
411
412     def clean_home(self):
413         """ Cleans all NEPI related folders in the Linux host
414         """
415         self.info("Cleaning up home")
416         
417         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
418                 self.home_dir )
419
420         return self.execute(cmd, with_lock = True)
421
422     def clean_experiment(self):
423         """ Cleans all experiment related files in the Linux host.
424         It preserves NEPI files and folders that have a multi experiment
425         scope.
426         """
427         self.info("Cleaning up experiment files")
428         
429         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
430                 self.exp_dir,
431                 self.ec.exp_id )
432             
433         return self.execute(cmd, with_lock = True)
434
435     def execute(self, command,
436             sudo = False,
437             stdin = None, 
438             env = None,
439             tty = False,
440             forward_x11 = False,
441             timeout = None,
442             retry = 3,
443             err_on_timeout = True,
444             connect_timeout = 30,
445             strict_host_checking = False,
446             persistent = True,
447             blocking = True,
448             with_lock = False
449             ):
450         """ Notice that this invocation will block until the
451         execution finishes. If this is not the desired behavior,
452         use 'run' instead."""
453
454         if self.localhost:
455             (out, err), proc = execfuncs.lexec(command, 
456                     user = user,
457                     sudo = sudo,
458                     stdin = stdin,
459                     env = env)
460         else:
461             if with_lock:
462                 with self._node_lock:
463                     (out, err), proc = sshfuncs.rexec(
464                         command, 
465                         host = self.get("hostname"),
466                         user = self.get("username"),
467                         port = self.get("port"),
468                         agent = True,
469                         sudo = sudo,
470                         stdin = stdin,
471                         identity = self.get("identity"),
472                         server_key = self.get("serverKey"),
473                         env = env,
474                         tty = tty,
475                         forward_x11 = forward_x11,
476                         timeout = timeout,
477                         retry = retry,
478                         err_on_timeout = err_on_timeout,
479                         connect_timeout = connect_timeout,
480                         persistent = persistent,
481                         blocking = blocking, 
482                         strict_host_checking = strict_host_checking
483                         )
484             else:
485                 (out, err), proc = sshfuncs.rexec(
486                     command, 
487                     host = self.get("hostname"),
488                     user = self.get("username"),
489                     port = self.get("port"),
490                     agent = True,
491                     sudo = sudo,
492                     stdin = stdin,
493                     identity = self.get("identity"),
494                     server_key = self.get("serverKey"),
495                     env = env,
496                     tty = tty,
497                     forward_x11 = forward_x11,
498                     timeout = timeout,
499                     retry = retry,
500                     err_on_timeout = err_on_timeout,
501                     connect_timeout = connect_timeout,
502                     persistent = persistent,
503                     blocking = blocking, 
504                     strict_host_checking = strict_host_checking
505                     )
506
507         return (out, err), proc
508
509     def run(self, command, home,
510             create_home = False,
511             pidfile = 'pidfile',
512             stdin = None, 
513             stdout = 'stdout', 
514             stderr = 'stderr', 
515             sudo = False,
516             tty = False):
517         
518         self.debug("Running command '%s'" % command)
519         
520         if self.localhost:
521             (out, err), proc = execfuncs.lspawn(command, pidfile, 
522                     stdout = stdout, 
523                     stderr = stderr, 
524                     stdin = stdin, 
525                     home = home, 
526                     create_home = create_home, 
527                     sudo = sudo,
528                     user = user) 
529         else:
530             with self._node_lock:
531                 (out, err), proc = sshfuncs.rspawn(
532                     command,
533                     pidfile = pidfile,
534                     home = home,
535                     create_home = create_home,
536                     stdin = stdin if stdin is not None else '/dev/null',
537                     stdout = stdout if stdout else '/dev/null',
538                     stderr = stderr if stderr else '/dev/null',
539                     sudo = sudo,
540                     host = self.get("hostname"),
541                     user = self.get("username"),
542                     port = self.get("port"),
543                     agent = True,
544                     identity = self.get("identity"),
545                     server_key = self.get("serverKey"),
546                     tty = tty
547                     )
548
549         return (out, err), proc
550
551     def getpid(self, home, pidfile = "pidfile"):
552         if self.localhost:
553             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
554         else:
555             with self._node_lock:
556                 pidtuple = sshfuncs.rgetpid(
557                     os.path.join(home, pidfile),
558                     host = self.get("hostname"),
559                     user = self.get("username"),
560                     port = self.get("port"),
561                     agent = True,
562                     identity = self.get("identity"),
563                     server_key = self.get("serverKey")
564                     )
565         
566         return pidtuple
567
568     def status(self, pid, ppid):
569         if self.localhost:
570             status = execfuncs.lstatus(pid, ppid)
571         else:
572             with self._node_lock:
573                 status = sshfuncs.rstatus(
574                         pid, ppid,
575                         host = self.get("hostname"),
576                         user = self.get("username"),
577                         port = self.get("port"),
578                         agent = True,
579                         identity = self.get("identity"),
580                         server_key = self.get("serverKey")
581                         )
582            
583         return status
584     
585     def kill(self, pid, ppid, sudo = False):
586         out = err = ""
587         proc = None
588         status = self.status(pid, ppid)
589
590         if status == sshfuncs.ProcStatus.RUNNING:
591             if self.localhost:
592                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
593             else:
594                 with self._node_lock:
595                     (out, err), proc = sshfuncs.rkill(
596                         pid, ppid,
597                         host = self.get("hostname"),
598                         user = self.get("username"),
599                         port = self.get("port"),
600                         agent = True,
601                         sudo = sudo,
602                         identity = self.get("identity"),
603                         server_key = self.get("serverKey")
604                         )
605
606         return (out, err), proc
607
608     def copy(self, src, dst):
609         if self.localhost:
610             (out, err), proc = execfuncs.lcopy(source, dest, 
611                     recursive = True,
612                     strict_host_checking = False)
613         else:
614             with self._node_lock:
615                 (out, err), proc = sshfuncs.rcopy(
616                     src, dst, 
617                     port = self.get("port"),
618                     identity = self.get("identity"),
619                     server_key = self.get("serverKey"),
620                     recursive = True,
621                     strict_host_checking = False)
622
623         return (out, err), proc
624
625     def upload(self, src, dst, text = False, overwrite = True):
626         """ Copy content to destination
627
628            src  content to copy. Can be a local file, directory or a list of files
629
630            dst  destination path on the remote host (remote is always self.host)
631
632            text src is text input, it must be stored into a temp file before uploading
633         """
634         # If source is a string input 
635         f = None
636         if text and not os.path.isfile(src):
637             # src is text input that should be uploaded as file
638             # create a temporal file with the content to upload
639             f = tempfile.NamedTemporaryFile(delete=False)
640             f.write(src)
641             f.close()
642             src = f.name
643
644         # If dst files should not be overwritten, check that the files do not
645         # exits already 
646         if overwrite == False:
647             src = self.filter_existing_files(src, dst)
648             if not src:
649                 return ("", ""), None 
650
651         if not self.localhost:
652             # Build destination as <user>@<server>:<path>
653             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
654
655         result = self.copy(src, dst)
656
657         # clean up temp file
658         if f:
659             os.remove(f.name)
660
661         return result
662
663     def download(self, src, dst):
664         if not self.localhost:
665             # Build destination as <user>@<server>:<path>
666             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
667         return self.copy(src, dst)
668
669     def install_packages_command(self, packages):
670         command = ""
671         if self.use_rpm:
672             command = rpmfuncs.install_packages_command(self.os, packages)
673         elif self.use_deb:
674             command = debfuncs.install_packages_command(self.os, packages)
675         else:
676             msg = "Error installing packages ( OS not known ) "
677             self.error(msg, self.os)
678             raise RuntimeError, msg
679
680         return command
681
682     def install_packages(self, packages, home, run_home = None):
683         """ Install packages in the Linux host.
684
685         'home' is the directory to upload the package installation script.
686         'run_home' is the directory from where to execute the script.
687         """
688         command = self.install_packages_command(packages)
689
690         run_home = run_home or home
691
692         (out, err), proc = self.run_and_wait(command, run_home, 
693             shfile = os.path.join(home, "instpkg.sh"),
694             pidfile = "instpkg_pidfile",
695             ecodefile = "instpkg_exitcode",
696             stdout = "instpkg_stdout", 
697             stderr = "instpkg_stderr",
698             overwrite = False,
699             raise_on_error = True)
700
701         return (out, err), proc 
702
703     def remove_packages(self, packages, home, run_home = None):
704         """ Uninstall packages from the Linux host.
705
706         'home' is the directory to upload the package un-installation script.
707         'run_home' is the directory from where to execute the script.
708         """
709         if self.use_rpm:
710             command = rpmfuncs.remove_packages_command(self.os, packages)
711         elif self.use_deb:
712             command = debfuncs.remove_packages_command(self.os, packages)
713         else:
714             msg = "Error removing packages ( OS not known ) "
715             self.error(msg)
716             raise RuntimeError, msg
717
718         run_home = run_home or home
719
720         (out, err), proc = self.run_and_wait(command, run_home, 
721             shfile = os.path.join(home, "rmpkg.sh"),
722             pidfile = "rmpkg_pidfile",
723             ecodefile = "rmpkg_exitcode",
724             stdout = "rmpkg_stdout", 
725             stderr = "rmpkg_stderr",
726             overwrite = False,
727             raise_on_error = True)
728          
729         return (out, err), proc 
730
731     def mkdir(self, path, clean = False):
732         if clean:
733             self.rmdir(path)
734
735         return self.execute("mkdir -p %s" % path, with_lock = True)
736
737     def rmdir(self, path):
738         return self.execute("rm -rf %s" % path, with_lock = True)
739         
740     def run_and_wait(self, command, home, 
741             shfile = "cmd.sh",
742             env = None,
743             overwrite = True,
744             pidfile = "pidfile", 
745             ecodefile = "exitcode", 
746             stdin = None, 
747             stdout = "stdout", 
748             stderr = "stderr", 
749             sudo = False,
750             tty = False,
751             raise_on_error = False):
752         """
753         Uploads the 'command' to a bash script in the host.
754         Then runs the script detached in background in the host, and
755         busy-waites until the script finishes executing.
756         """
757
758         if not shfile.startswith("/"):
759             shfile = os.path.join(home, shfile)
760
761         self.upload_command(command, 
762             shfile = shfile, 
763             ecodefile = ecodefile, 
764             env = env,
765             overwrite = overwrite)
766
767         command = "bash %s" % shfile
768         # run command in background in remote host
769         (out, err), proc = self.run(command, home, 
770                 pidfile = pidfile,
771                 stdin = stdin, 
772                 stdout = stdout, 
773                 stderr = stderr, 
774                 sudo = sudo,
775                 tty = tty)
776
777         # check no errors occurred
778         if proc.poll():
779             msg = " Failed to run command '%s' " % command
780             self.error(msg, out, err)
781             if raise_on_error:
782                 raise RuntimeError, msg
783
784         # Wait for pid file to be generated
785         pid, ppid = self.wait_pid(
786                 home = home, 
787                 pidfile = pidfile, 
788                 raise_on_error = raise_on_error)
789
790         # wait until command finishes to execute
791         self.wait_run(pid, ppid)
792       
793         (eout, err), proc = self.check_errors(home,
794             ecodefile = ecodefile,
795             stderr = stderr)
796
797         # Out is what was written in the stderr file
798         if err:
799             msg = " Failed to run command '%s' " % command
800             self.error(msg, eout, err)
801
802             if raise_on_error:
803                 raise RuntimeError, msg
804
805         (out, oerr), proc = self.check_output(home, stdout)
806         
807         return (out, err), proc
808
809     def exitcode(self, home, ecodefile = "exitcode"):
810         """
811         Get the exit code of an application.
812         Returns an integer value with the exit code 
813         """
814         (out, err), proc = self.check_output(home, ecodefile)
815
816         # Succeeded to open file, return exit code in the file
817         if proc.wait() == 0:
818             try:
819                 return int(out.strip())
820             except:
821                 # Error in the content of the file!
822                 return ExitCode.CORRUPTFILE
823
824         # No such file or directory
825         if proc.returncode == 1:
826             return ExitCode.FILENOTFOUND
827         
828         # Other error from 'cat'
829         return ExitCode.ERROR
830
831     def upload_command(self, command, 
832             shfile = "cmd.sh",
833             ecodefile = "exitcode",
834             overwrite = True,
835             env = None):
836         """ Saves the command as a bash script file in the remote host, and
837         forces to save the exit code of the command execution to the ecodefile
838         """
839
840         if not (command.strip().endswith(";") or command.strip().endswith("&")):
841             command += ";"
842       
843         # The exit code of the command will be stored in ecodefile
844         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
845                 'command': command,
846                 'ecodefile': ecodefile,
847                 } 
848
849         # Export environment
850         environ = self.format_environment(env)
851
852         # Add environ to command
853         command = environ + command
854
855         return self.upload(command, shfile, text = True, overwrite = overwrite)
856
857     def format_environment(self, env, inline = False):
858         """ Formats the environment variables for a command to be executed
859         either as an inline command
860         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
861         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
862         """
863         if not env: return ""
864
865         # Remove extra white spaces
866         env = re.sub(r'\s+', ' ', env.strip())
867
868         sep = ";" if inline else "\n"
869         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
870
871     def check_errors(self, home, 
872             ecodefile = "exitcode", 
873             stderr = "stderr"):
874         """ Checks whether errors occurred while running a command.
875         It first checks the exit code for the command, and only if the
876         exit code is an error one it returns the error output.
877
878         """
879         proc = None
880         err = ""
881
882         # get exit code saved in the 'exitcode' file
883         ecode = self.exitcode(home, ecodefile)
884
885         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
886             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
887         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
888             # The process returned an error code or didn't exist. 
889             # Check standard error.
890             (err, eerr), proc = self.check_output(home, stderr)
891
892             # If the stderr file was not found, assume nothing bad happened,
893             # and just ignore the error.
894             # (cat returns 1 for error "No such file or directory")
895             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
896                 err = "" 
897             
898         return ("", err), proc
899  
900     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
901         """ Waits until the pid file for the command is generated, 
902             and returns the pid and ppid of the process """
903         pid = ppid = None
904         delay = 1.0
905
906         for i in xrange(2):
907             pidtuple = self.getpid(home = home, pidfile = pidfile)
908             
909             if pidtuple:
910                 pid, ppid = pidtuple
911                 break
912             else:
913                 time.sleep(delay)
914                 delay = delay * 1.5
915         else:
916             msg = " Failed to get pid for pidfile %s/%s " % (
917                     home, pidfile )
918             self.error(msg)
919             
920             if raise_on_error:
921                 raise RuntimeError, msg
922
923         return pid, ppid
924
925     def wait_run(self, pid, ppid, trial = 0):
926         """ wait for a remote process to finish execution """
927         delay = 1.0
928
929         while True:
930             status = self.status(pid, ppid)
931             
932             if status is ProcStatus.FINISHED:
933                 break
934             elif status is not ProcStatus.RUNNING:
935                 delay = delay * 1.5
936                 time.sleep(delay)
937                 # If it takes more than 20 seconds to start, then
938                 # asume something went wrong
939                 if delay > 20:
940                     break
941             else:
942                 # The app is running, just wait...
943                 time.sleep(0.5)
944
945     def check_output(self, home, filename):
946         """ Retrives content of file """
947         (out, err), proc = self.execute("cat %s" % 
948             os.path.join(home, filename), retry = 1, with_lock = True)
949         return (out, err), proc
950
951     def is_alive(self):
952         """ Checks if host is responsive
953         """
954         if self.localhost:
955             return True
956
957         out = err = ""
958         msg = "Unresponsive host. Wrong answer. "
959
960         # The underlying SSH layer will sometimes return an empty
961         # output (even if the command was executed without errors).
962         # To work arround this, repeat the operation N times or
963         # until the result is not empty string
964         try:
965             (out, err), proc = self.execute("echo 'ALIVE'",
966                     blocking = True,
967                     with_lock = True)
968     
969             if out.find("ALIVE") > -1:
970                 return True
971         except:
972             trace = traceback.format_exc()
973             msg = "Unresponsive host. Error reaching host: %s " % trace
974
975         self.error(msg, out, err)
976         return False
977
978     def find_home(self):
979         """ Retrieves host home directory
980         """
981         # The underlying SSH layer will sometimes return an empty
982         # output (even if the command was executed without errors).
983         # To work arround this, repeat the operation N times or
984         # until the result is not empty string
985         msg = "Impossible to retrieve HOME directory"
986         try:
987             (out, err), proc = self.execute("echo ${HOME}",
988                     blocking = True,
989                     with_lock = True)
990     
991             if out.strip() != "":
992                 self._home_dir =  out.strip()
993         except:
994             trace = traceback.format_exc()
995             msg = "Impossible to retrieve HOME directory" % trace
996
997         if not self._home_dir:
998             self.error(msg, out, err)
999             raise RuntimeError, msg
1000
1001     def filter_existing_files(self, src, dst):
1002         """ Removes files that already exist in the Linux host from src list
1003         """
1004         # construct a dictionary with { dst: src }
1005         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
1006             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1007
1008         command = []
1009         for d in dests.keys():
1010             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1011
1012         command = ";".join(command)
1013
1014         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1015     
1016         for d in dests.keys():
1017             if out.find(d) > -1:
1018                 del dests[d]
1019
1020         if not dests:
1021             return ""
1022
1023         return " ".join(dests.values())
1024