NS3Client: replacing socat for ssh
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.Design)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.Design)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.Design)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.Design)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.Design)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.Design)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.Design)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.Design)
190
191         cls._register_attribute(hostname)
192         cls._register_attribute(username)
193         cls._register_attribute(port)
194         cls._register_attribute(home)
195         cls._register_attribute(identity)
196         cls._register_attribute(server_key)
197         cls._register_attribute(clean_home)
198         cls._register_attribute(clean_experiment)
199         cls._register_attribute(clean_processes)
200         cls._register_attribute(tear_down)
201
202     def __init__(self, ec, guid):
203         super(LinuxNode, self).__init__(ec, guid)
204         self._os = None
205         # home directory at Linux host
206         self._home_dir = ""
207         
208         # lock to prevent concurrent applications on the same node,
209         # to execute commands at the same time. There are potential
210         # concurrency issues when using SSH to a same host from 
211         # multiple threads. There are also possible operational 
212         # issues, e.g. an application querying the existence 
213         # of a file or folder prior to its creation, and another 
214         # application creating the same file or folder in between.
215         self._node_lock = threading.Lock()
216     
217     def log_message(self, msg):
218         return " guid %d - host %s - %s " % (self.guid, 
219                 self.get("hostname"), msg)
220
221     @property
222     def home_dir(self):
223         home = self.get("home") or ""
224         if not home.startswith("/"):
225            home = os.path.join(self._home_dir, home) 
226         return home
227
228     @property
229     def usr_dir(self):
230         return os.path.join(self.home_dir, "nepi-usr")
231
232     @property
233     def lib_dir(self):
234         return os.path.join(self.usr_dir, "lib")
235
236     @property
237     def bin_dir(self):
238         return os.path.join(self.usr_dir, "bin")
239
240     @property
241     def src_dir(self):
242         return os.path.join(self.usr_dir, "src")
243
244     @property
245     def share_dir(self):
246         return os.path.join(self.usr_dir, "share")
247
248     @property
249     def exp_dir(self):
250         return os.path.join(self.home_dir, "nepi-exp")
251
252     @property
253     def exp_home(self):
254         return os.path.join(self.exp_dir, self.ec.exp_id)
255
256     @property
257     def node_home(self):
258         return os.path.join(self.exp_home, "node-%d" % self.guid)
259
260     @property
261     def run_home(self):
262         return os.path.join(self.node_home, self.ec.run_id)
263
264     @property
265     def os(self):
266         if self._os:
267             return self._os
268
269         if (not self.get("hostname") or not self.get("username")):
270             msg = "Can't resolve OS, insufficient data "
271             self.error(msg)
272             raise RuntimeError, msg
273
274         out = self.get_os()
275
276         if out.find("Fedora release 8") == 0:
277             self._os = OSType.FEDORA_8
278         elif out.find("Fedora release 12") == 0:
279             self._os = OSType.FEDORA_12
280         elif out.find("Fedora release 14") == 0:
281             self._os = OSType.FEDORA_14
282         elif out.find("Fedora release") == 0:
283             self._os = OSType.FEDORA
284         elif out.find("Debian") == 0: 
285             self._os = OSType.DEBIAN
286         elif out.find("Ubuntu") ==0:
287             self._os = OSType.UBUNTU
288         else:
289             msg = "Unsupported OS"
290             self.error(msg, out)
291             raise RuntimeError, "%s - %s " %( msg, out )
292
293         return self._os
294
295     def get_os(self):
296         # The underlying SSH layer will sometimes return an empty
297         # output (even if the command was executed without errors).
298         # To work arround this, repeat the operation N times or
299         # until the result is not empty string
300         out = ""
301         try:
302             (out, err), proc = self.execute("cat /etc/issue", 
303                     with_lock = True,
304                     blocking = True)
305         except:
306             trace = traceback.format_exc()
307             msg = "Error detecting OS: %s " % trace
308             self.error(msg, out, err)
309         
310         return out
311
312     @property
313     def use_deb(self):
314         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
315
316     @property
317     def use_rpm(self):
318         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
319                 OSType.FEDORA]
320
321     @property
322     def localhost(self):
323         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
324
325     def do_provision(self):
326         # check if host is alive
327         if not self.is_alive():
328             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
329             self.error(msg)
330             raise RuntimeError, msg
331
332         self.find_home()
333
334         if self.get("cleanProcesses"):
335             self.clean_processes()
336
337         if self.get("cleanHome"):
338             self.clean_home()
339  
340         if self.get("cleanExperiment"):
341             self.clean_experiment()
342     
343         # Create shared directory structure
344         self.mkdir(self.lib_dir)
345         self.mkdir(self.bin_dir)
346         self.mkdir(self.src_dir)
347         self.mkdir(self.share_dir)
348
349         # Create experiment node home directory
350         self.mkdir(self.node_home)
351
352         super(LinuxNode, self).do_provision()
353
354     def do_deploy(self):
355         if self.state == ResourceState.NEW:
356             self.info("Deploying node")
357             self.do_discover()
358             self.do_provision()
359
360         # Node needs to wait until all associated interfaces are 
361         # ready before it can finalize deployment
362         from nepi.resources.linux.interface import LinuxInterface
363         ifaces = self.get_connected(LinuxInterface.get_rtype())
364         for iface in ifaces:
365             if iface.state < ResourceState.READY:
366                 self.ec.schedule(reschedule_delay, self.deploy)
367                 return 
368
369         super(LinuxNode, self).do_deploy()
370
371     def do_release(self):
372         rms = self.get_connected()
373         for rm in rms:
374             # Node needs to wait until all associated RMs are released
375             # before it can be released
376             if rm.state != ResourceState.RELEASED:
377                 self.ec.schedule(reschedule_delay, self.release)
378                 return 
379
380         tear_down = self.get("tearDown")
381         if tear_down:
382             self.execute(tear_down)
383
384         self.clean_processes()
385
386         super(LinuxNode, self).do_release()
387
388     def valid_connection(self, guid):
389         # TODO: Validate!
390         return True
391
392     def clean_processes(self):
393         self.info("Cleaning up processes")
394         
395         cmd = ("sudo -S killall tcpdump || /bin/true ; " +
396             "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
397             "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
398
399         out = err = ""
400         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
401
402     def clean_home(self):
403         """ Cleans all NEPI related folders in the Linux host
404         """
405         self.info("Cleaning up home")
406         
407         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
408                 self.home_dir )
409
410         return self.execute(cmd, with_lock = True)
411
412     def clean_experiment(self):
413         """ Cleans all experiment related files in the Linux host.
414         It preserves NEPI files and folders that have a multi experiment
415         scope.
416         """
417         self.info("Cleaning up experiment files")
418         
419         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
420                 self.exp_dir,
421                 self.ec.exp_id )
422             
423         return self.execute(cmd, with_lock = True)
424
425     def execute(self, command,
426             sudo = False,
427             env = None,
428             tty = False,
429             forward_x11 = False,
430             retry = 3,
431             connect_timeout = 30,
432             strict_host_checking = False,
433             persistent = True,
434             blocking = True,
435             with_lock = False
436             ):
437         """ Notice that this invocation will block until the
438         execution finishes. If this is not the desired behavior,
439         use 'run' instead."""
440
441         if self.localhost:
442             (out, err), proc = execfuncs.lexec(command, 
443                     user = self.get("username"), # still problem with localhost
444                     sudo = sudo,
445                     env = env)
446         else:
447             if with_lock:
448                 # If the execute command is blocking, we don't want to keep
449                 # the node lock. This lock is used to avoid race conditions
450                 # when creating the ControlMaster sockets. A more elegant
451                 # solution is needed.
452                 with self._node_lock:
453                     (out, err), proc = sshfuncs.rexec(
454                         command, 
455                         host = self.get("hostname"),
456                         user = self.get("username"),
457                         port = self.get("port"),
458                         agent = True,
459                         sudo = sudo,
460                         identity = self.get("identity"),
461                         server_key = self.get("serverKey"),
462                         env = env,
463                         tty = tty,
464                         forward_x11 = forward_x11,
465                         retry = retry,
466                         connect_timeout = connect_timeout,
467                         persistent = persistent,
468                         blocking = blocking, 
469                         strict_host_checking = strict_host_checking
470                         )
471             else:
472                 (out, err), proc = sshfuncs.rexec(
473                     command, 
474                     host = self.get("hostname"),
475                     user = self.get("username"),
476                     port = self.get("port"),
477                     agent = True,
478                     sudo = sudo,
479                     identity = self.get("identity"),
480                     server_key = self.get("serverKey"),
481                     env = env,
482                     tty = tty,
483                     forward_x11 = forward_x11,
484                     retry = retry,
485                     connect_timeout = connect_timeout,
486                     persistent = persistent,
487                     blocking = blocking, 
488                     strict_host_checking = strict_host_checking
489                     )
490
491         return (out, err), proc
492
493     def run(self, command, home,
494             create_home = False,
495             pidfile = 'pidfile',
496             stdin = None, 
497             stdout = 'stdout', 
498             stderr = 'stderr', 
499             sudo = False,
500             tty = False):
501         
502         self.debug("Running command '%s'" % command)
503         
504         if self.localhost:
505             (out, err), proc = execfuncs.lspawn(command, pidfile, 
506                     stdout = stdout, 
507                     stderr = stderr, 
508                     stdin = stdin, 
509                     home = home, 
510                     create_home = create_home, 
511                     sudo = sudo,
512                     user = user) 
513         else:
514             with self._node_lock:
515                 (out, err), proc = sshfuncs.rspawn(
516                     command,
517                     pidfile = pidfile,
518                     home = home,
519                     create_home = create_home,
520                     stdin = stdin if stdin is not None else '/dev/null',
521                     stdout = stdout if stdout else '/dev/null',
522                     stderr = stderr if stderr else '/dev/null',
523                     sudo = sudo,
524                     host = self.get("hostname"),
525                     user = self.get("username"),
526                     port = self.get("port"),
527                     agent = True,
528                     identity = self.get("identity"),
529                     server_key = self.get("serverKey"),
530                     tty = tty
531                     )
532
533         return (out, err), proc
534
535     def getpid(self, home, pidfile = "pidfile"):
536         if self.localhost:
537             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
538         else:
539             with self._node_lock:
540                 pidtuple = sshfuncs.rgetpid(
541                     os.path.join(home, pidfile),
542                     host = self.get("hostname"),
543                     user = self.get("username"),
544                     port = self.get("port"),
545                     agent = True,
546                     identity = self.get("identity"),
547                     server_key = self.get("serverKey")
548                     )
549         
550         return pidtuple
551
552     def status(self, pid, ppid):
553         if self.localhost:
554             status = execfuncs.lstatus(pid, ppid)
555         else:
556             with self._node_lock:
557                 status = sshfuncs.rstatus(
558                         pid, ppid,
559                         host = self.get("hostname"),
560                         user = self.get("username"),
561                         port = self.get("port"),
562                         agent = True,
563                         identity = self.get("identity"),
564                         server_key = self.get("serverKey")
565                         )
566            
567         return status
568     
569     def kill(self, pid, ppid, sudo = False):
570         out = err = ""
571         proc = None
572         status = self.status(pid, ppid)
573
574         if status == sshfuncs.ProcStatus.RUNNING:
575             if self.localhost:
576                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
577             else:
578                 with self._node_lock:
579                     (out, err), proc = sshfuncs.rkill(
580                         pid, ppid,
581                         host = self.get("hostname"),
582                         user = self.get("username"),
583                         port = self.get("port"),
584                         agent = True,
585                         sudo = sudo,
586                         identity = self.get("identity"),
587                         server_key = self.get("serverKey")
588                         )
589
590         return (out, err), proc
591
592     def copy(self, src, dst):
593         if self.localhost:
594             (out, err), proc = execfuncs.lcopy(source, dest, 
595                     recursive = True,
596                     strict_host_checking = False)
597         else:
598             (out, err), proc = sshfuncs.rcopy(
599                 src, dst, 
600                 port = self.get("port"),
601                 identity = self.get("identity"),
602                 server_key = self.get("serverKey"),
603                 recursive = True,
604                 strict_host_checking = False)
605
606         return (out, err), proc
607
608     def upload(self, src, dst, text = False, overwrite = True):
609         """ Copy content to destination
610
611            src  content to copy. Can be a local file, directory or a list of files
612
613            dst  destination path on the remote host (remote is always self.host)
614
615            text src is text input, it must be stored into a temp file before uploading
616         """
617         # If source is a string input 
618         f = None
619         if text and not os.path.isfile(src):
620             # src is text input that should be uploaded as file
621             # create a temporal file with the content to upload
622             f = tempfile.NamedTemporaryFile(delete=False)
623             f.write(src)
624             f.close()
625             src = f.name
626
627         # If dst files should not be overwritten, check that the files do not
628         # exits already 
629         if overwrite == False:
630             src = self.filter_existing_files(src, dst)
631             if not src:
632                 return ("", ""), None 
633
634         if not self.localhost:
635             # Build destination as <user>@<server>:<path>
636             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
637
638         result = self.copy(src, dst)
639
640         # clean up temp file
641         if f:
642             os.remove(f.name)
643
644         return result
645
646     def download(self, src, dst):
647         if not self.localhost:
648             # Build destination as <user>@<server>:<path>
649             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
650         return self.copy(src, dst)
651
652     def install_packages_command(self, packages):
653         command = ""
654         if self.use_rpm:
655             command = rpmfuncs.install_packages_command(self.os, packages)
656         elif self.use_deb:
657             command = debfuncs.install_packages_command(self.os, packages)
658         else:
659             msg = "Error installing packages ( OS not known ) "
660             self.error(msg, self.os)
661             raise RuntimeError, msg
662
663         return command
664
665     def install_packages(self, packages, home, run_home = None):
666         """ Install packages in the Linux host.
667
668         'home' is the directory to upload the package installation script.
669         'run_home' is the directory from where to execute the script.
670         """
671         command = self.install_packages_command(packages)
672
673         run_home = run_home or home
674
675         (out, err), proc = self.run_and_wait(command, run_home, 
676             shfile = os.path.join(home, "instpkg.sh"),
677             pidfile = "instpkg_pidfile",
678             ecodefile = "instpkg_exitcode",
679             stdout = "instpkg_stdout", 
680             stderr = "instpkg_stderr",
681             overwrite = False,
682             raise_on_error = True)
683
684         return (out, err), proc 
685
686     def remove_packages(self, packages, home, run_home = None):
687         """ Uninstall packages from the Linux host.
688
689         'home' is the directory to upload the package un-installation script.
690         'run_home' is the directory from where to execute the script.
691         """
692         if self.use_rpm:
693             command = rpmfuncs.remove_packages_command(self.os, packages)
694         elif self.use_deb:
695             command = debfuncs.remove_packages_command(self.os, packages)
696         else:
697             msg = "Error removing packages ( OS not known ) "
698             self.error(msg)
699             raise RuntimeError, msg
700
701         run_home = run_home or home
702
703         (out, err), proc = self.run_and_wait(command, run_home, 
704             shfile = os.path.join(home, "rmpkg.sh"),
705             pidfile = "rmpkg_pidfile",
706             ecodefile = "rmpkg_exitcode",
707             stdout = "rmpkg_stdout", 
708             stderr = "rmpkg_stderr",
709             overwrite = False,
710             raise_on_error = True)
711          
712         return (out, err), proc 
713
714     def mkdir(self, path, clean = False):
715         if clean:
716             self.rmdir(path)
717
718         return self.execute("mkdir -p %s" % path, with_lock = True)
719
720     def rmdir(self, path):
721         return self.execute("rm -rf %s" % path, with_lock = True)
722         
723     def run_and_wait(self, command, home, 
724             shfile = "cmd.sh",
725             env = None,
726             overwrite = True,
727             pidfile = "pidfile", 
728             ecodefile = "exitcode", 
729             stdin = None, 
730             stdout = "stdout", 
731             stderr = "stderr", 
732             sudo = False,
733             tty = False,
734             raise_on_error = False):
735         """
736         Uploads the 'command' to a bash script in the host.
737         Then runs the script detached in background in the host, and
738         busy-waites until the script finishes executing.
739         """
740
741         if not shfile.startswith("/"):
742             shfile = os.path.join(home, shfile)
743
744         self.upload_command(command, 
745             shfile = shfile, 
746             ecodefile = ecodefile, 
747             env = env,
748             overwrite = overwrite)
749
750         command = "bash %s" % shfile
751         # run command in background in remote host
752         (out, err), proc = self.run(command, home, 
753                 pidfile = pidfile,
754                 stdin = stdin, 
755                 stdout = stdout, 
756                 stderr = stderr, 
757                 sudo = sudo,
758                 tty = tty)
759
760         # check no errors occurred
761         if proc.poll():
762             msg = " Failed to run command '%s' " % command
763             self.error(msg, out, err)
764             if raise_on_error:
765                 raise RuntimeError, msg
766
767         # Wait for pid file to be generated
768         pid, ppid = self.wait_pid(
769                 home = home, 
770                 pidfile = pidfile, 
771                 raise_on_error = raise_on_error)
772
773         # wait until command finishes to execute
774         self.wait_run(pid, ppid)
775       
776         (eout, err), proc = self.check_errors(home,
777             ecodefile = ecodefile,
778             stderr = stderr)
779
780         # Out is what was written in the stderr file
781         if err:
782             msg = " Failed to run command '%s' " % command
783             self.error(msg, eout, err)
784
785             if raise_on_error:
786                 raise RuntimeError, msg
787
788         (out, oerr), proc = self.check_output(home, stdout)
789         
790         return (out, err), proc
791
792     def exitcode(self, home, ecodefile = "exitcode"):
793         """
794         Get the exit code of an application.
795         Returns an integer value with the exit code 
796         """
797         (out, err), proc = self.check_output(home, ecodefile)
798
799         # Succeeded to open file, return exit code in the file
800         if proc.wait() == 0:
801             try:
802                 return int(out.strip())
803             except:
804                 # Error in the content of the file!
805                 return ExitCode.CORRUPTFILE
806
807         # No such file or directory
808         if proc.returncode == 1:
809             return ExitCode.FILENOTFOUND
810         
811         # Other error from 'cat'
812         return ExitCode.ERROR
813
814     def upload_command(self, command, 
815             shfile = "cmd.sh",
816             ecodefile = "exitcode",
817             overwrite = True,
818             env = None):
819         """ Saves the command as a bash script file in the remote host, and
820         forces to save the exit code of the command execution to the ecodefile
821         """
822
823         if not (command.strip().endswith(";") or command.strip().endswith("&")):
824             command += ";"
825       
826         # The exit code of the command will be stored in ecodefile
827         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
828                 'command': command,
829                 'ecodefile': ecodefile,
830                 } 
831
832         # Export environment
833         environ = self.format_environment(env)
834
835         # Add environ to command
836         command = environ + command
837
838         return self.upload(command, shfile, text = True, overwrite = overwrite)
839
840     def format_environment(self, env, inline = False):
841         """ Formats the environment variables for a command to be executed
842         either as an inline command
843         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
844         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
845         """
846         if not env: return ""
847
848         # Remove extra white spaces
849         env = re.sub(r'\s+', ' ', env.strip())
850
851         sep = ";" if inline else "\n"
852         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
853
854     def check_errors(self, home, 
855             ecodefile = "exitcode", 
856             stderr = "stderr"):
857         """ Checks whether errors occurred while running a command.
858         It first checks the exit code for the command, and only if the
859         exit code is an error one it returns the error output.
860
861         """
862         proc = None
863         err = ""
864
865         # get exit code saved in the 'exitcode' file
866         ecode = self.exitcode(home, ecodefile)
867
868         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
869             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
870         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
871             # The process returned an error code or didn't exist. 
872             # Check standard error.
873             (err, eerr), proc = self.check_output(home, stderr)
874
875             # If the stderr file was not found, assume nothing bad happened,
876             # and just ignore the error.
877             # (cat returns 1 for error "No such file or directory")
878             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
879                 err = "" 
880             
881         return ("", err), proc
882  
883     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
884         """ Waits until the pid file for the command is generated, 
885             and returns the pid and ppid of the process """
886         pid = ppid = None
887         delay = 1.0
888
889         for i in xrange(2):
890             pidtuple = self.getpid(home = home, pidfile = pidfile)
891             
892             if pidtuple:
893                 pid, ppid = pidtuple
894                 break
895             else:
896                 time.sleep(delay)
897                 delay = delay * 1.5
898         else:
899             msg = " Failed to get pid for pidfile %s/%s " % (
900                     home, pidfile )
901             self.error(msg)
902             
903             if raise_on_error:
904                 raise RuntimeError, msg
905
906         return pid, ppid
907
908     def wait_run(self, pid, ppid, trial = 0):
909         """ wait for a remote process to finish execution """
910         delay = 1.0
911
912         while True:
913             status = self.status(pid, ppid)
914             
915             if status is ProcStatus.FINISHED:
916                 break
917             elif status is not ProcStatus.RUNNING:
918                 delay = delay * 1.5
919                 time.sleep(delay)
920                 # If it takes more than 20 seconds to start, then
921                 # asume something went wrong
922                 if delay > 20:
923                     break
924             else:
925                 # The app is running, just wait...
926                 time.sleep(0.5)
927
928     def check_output(self, home, filename):
929         """ Retrives content of file """
930         (out, err), proc = self.execute("cat %s" % 
931             os.path.join(home, filename), retry = 1, with_lock = True)
932         return (out, err), proc
933
934     def is_alive(self):
935         """ Checks if host is responsive
936         """
937         if self.localhost:
938             return True
939
940         out = err = ""
941         msg = "Unresponsive host. Wrong answer. "
942
943         # The underlying SSH layer will sometimes return an empty
944         # output (even if the command was executed without errors).
945         # To work arround this, repeat the operation N times or
946         # until the result is not empty string
947         try:
948             (out, err), proc = self.execute("echo 'ALIVE'",
949                     blocking = True,
950                     with_lock = True)
951     
952             if out.find("ALIVE") > -1:
953                 return True
954         except:
955             trace = traceback.format_exc()
956             msg = "Unresponsive host. Error reaching host: %s " % trace
957
958         self.error(msg, out, err)
959         return False
960
961     def find_home(self):
962         """ Retrieves host home directory
963         """
964         # The underlying SSH layer will sometimes return an empty
965         # output (even if the command was executed without errors).
966         # To work arround this, repeat the operation N times or
967         # until the result is not empty string
968         msg = "Impossible to retrieve HOME directory"
969         try:
970             (out, err), proc = self.execute("echo ${HOME}",
971                     blocking = True,
972                     with_lock = True)
973     
974             if out.strip() != "":
975                 self._home_dir =  out.strip()
976         except:
977             trace = traceback.format_exc()
978             msg = "Impossible to retrieve HOME directory %s" % trace
979
980         if not self._home_dir:
981             self.error(msg)
982             raise RuntimeError, msg
983
984     def filter_existing_files(self, src, dst):
985         """ Removes files that already exist in the Linux host from src list
986         """
987         # construct a dictionary with { dst: src }
988         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
989             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
990
991         command = []
992         for d in dests.keys():
993             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
994
995         command = ";".join(command)
996
997         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
998     
999         for d in dests.keys():
1000             if out.find(d) > -1:
1001                 del dests[d]
1002
1003         if not dests:
1004             return ""
1005
1006         return " ".join(dests.values())
1007