ec_shutdown
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
22         reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.ExecReadOnly)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.ExecReadOnly)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.ExecReadOnly)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 flags = Flags.ExecReadOnly)
172
173         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
174                 " from a previous same experiment, before the new experiment starts", 
175                 flags = Flags.ExecReadOnly)
176         
177         clean_processes = Attribute("cleanProcesses", 
178                 "Kill all running processes before starting experiment",
179                 flags = Flags.ExecReadOnly)
180         
181         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
182                 "releasing the resource",
183                 flags = Flags.ExecReadOnly)
184
185         cls._register_attribute(hostname)
186         cls._register_attribute(username)
187         cls._register_attribute(port)
188         cls._register_attribute(home)
189         cls._register_attribute(identity)
190         cls._register_attribute(server_key)
191         cls._register_attribute(clean_home)
192         cls._register_attribute(clean_experiment)
193         cls._register_attribute(clean_processes)
194         cls._register_attribute(tear_down)
195
196     def __init__(self, ec, guid):
197         super(LinuxNode, self).__init__(ec, guid)
198         self._os = None
199         # home directory at Linux host
200         self._home_dir = ""
201         
202         # lock to prevent concurrent applications on the same node,
203         # to execute commands at the same time. There are potential
204         # concurrency issues when using SSH to a same host from 
205         # multiple threads. There are also possible operational 
206         # issues, e.g. an application querying the existence 
207         # of a file or folder prior to its creation, and another 
208         # application creating the same file or folder in between.
209         self._node_lock = threading.Lock()
210     
211     def log_message(self, msg):
212         return " guid %d - host %s - %s " % (self.guid, 
213                 self.get("hostname"), msg)
214
215     @property
216     def home_dir(self):
217         home = self.get("home") or ""
218         if not home.startswith("/"):
219            home = os.path.join(self._home_dir, home) 
220         return home
221
222     @property
223     def usr_dir(self):
224         return os.path.join(self.home_dir, "nepi-usr")
225
226     @property
227     def lib_dir(self):
228         return os.path.join(self.usr_dir, "lib")
229
230     @property
231     def bin_dir(self):
232         return os.path.join(self.usr_dir, "bin")
233
234     @property
235     def src_dir(self):
236         return os.path.join(self.usr_dir, "src")
237
238     @property
239     def share_dir(self):
240         return os.path.join(self.usr_dir, "share")
241
242     @property
243     def exp_dir(self):
244         return os.path.join(self.home_dir, "nepi-exp")
245
246     @property
247     def exp_home(self):
248         return os.path.join(self.exp_dir, self.ec.exp_id)
249
250     @property
251     def node_home(self):
252         return os.path.join(self.exp_home, "node-%d" % self.guid)
253
254     @property
255     def run_home(self):
256         return os.path.join(self.node_home, self.ec.run_id)
257
258     @property
259     def os(self):
260         if self._os:
261             return self._os
262
263         if (not self.get("hostname") or not self.get("username")):
264             msg = "Can't resolve OS, insufficient data "
265             self.error(msg)
266             raise RuntimeError, msg
267
268         out = self.get_os()
269
270         if out.find("Fedora release 8") == 0:
271             self._os = OSType.FEDORA_8
272         elif out.find("Fedora release 12") == 0:
273             self._os = OSType.FEDORA_12
274         elif out.find("Fedora release 14") == 0:
275             self._os = OSType.FEDORA_14
276         elif out.find("Debian") == 0: 
277             self._os = OSType.DEBIAN
278         elif out.find("Ubuntu") ==0:
279             self._os = OSType.UBUNTU
280         else:
281             msg = "Unsupported OS"
282             self.error(msg, out)
283             raise RuntimeError, "%s - %s " %( msg, out )
284
285         return self._os
286
287     def get_os(self):
288         # The underlying SSH layer will sometimes return an empty
289         # output (even if the command was executed without errors).
290         # To work arround this, repeat the operation N times or
291         # until the result is not empty string
292         out = ""
293         retrydelay = 1.0
294         for i in xrange(10):
295             try:
296                 (out, err), proc = self.execute("cat /etc/issue", 
297                         retry = 5,
298                         with_lock = True,
299                         blocking = True)
300
301                 if out.strip() != "":
302                     return out
303             except:
304                 trace = traceback.format_exc()
305                 msg = "Error detecting OS: %s " % trace
306                 self.error(msg, out, err)
307                 return False
308
309             time.sleep(min(30.0, retrydelay))
310             retrydelay *= 1.5
311
312
313     @property
314     def use_deb(self):
315         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
316
317     @property
318     def use_rpm(self):
319         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
320                 OSType.FEDORA]
321
322     @property
323     def localhost(self):
324         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
325
326     def provision(self):
327         # check if host is alive
328         if not self.is_alive():
329             
330             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
331             self.error(msg)
332             raise RuntimeError, msg
333
334         self.find_home()
335
336         if self.get("cleanProcesses"):
337             self.clean_processes()
338
339         if self.get("cleanHome"):
340             self.clean_home()
341  
342         if self.get("cleanExperiment"):
343             self.clean_experiment()
344     
345         # Create shared directory structure
346         self.mkdir(self.lib_dir)
347         self.mkdir(self.bin_dir)
348         self.mkdir(self.src_dir)
349         self.mkdir(self.share_dir)
350
351         # Create experiment node home directory
352         self.mkdir(self.node_home)
353
354         super(LinuxNode, self).provision()
355
356     def deploy(self):
357         if self.state == ResourceState.NEW:
358             try:
359                 self.discover()
360                 self.provision()
361             except:
362                 self.fail()
363                 return
364
365         # Node needs to wait until all associated interfaces are 
366         # ready before it can finalize deployment
367         from nepi.resources.linux.interface import LinuxInterface
368         ifaces = self.get_connected(LinuxInterface.rtype())
369         for iface in ifaces:
370             if iface.state < ResourceState.READY:
371                 self.ec.schedule(reschedule_delay, self.deploy)
372                 return 
373
374         super(LinuxNode, self).deploy()
375
376     def release(self):
377         # Node needs to wait until all associated RMs are released
378         # to be released
379         rms = self.get_connected()
380         for rm in rms:
381             if rm.state < ResourceState.STOPPED:
382                 self.ec.schedule(reschedule_delay, self.release)
383                 return 
384
385         tear_down = self.get("tearDown")
386         if tear_down:
387             self.execute(tear_down)
388
389         self.clean_processes()
390
391         super(LinuxNode, self).release()
392
393     def valid_connection(self, guid):
394         # TODO: Validate!
395         return True
396
397     def clean_processes(self, killer = False):
398         self.info("Cleaning up processes")
399         
400         if killer:
401             # Hardcore kill
402             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
403                 "sudo -S killall python tcpdump || /bin/true ; " +
404                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
405                 "sudo -S killall -u root || /bin/true ; " +
406                 "sudo -S killall -u root || /bin/true ; ")
407         else:
408             # Be gentler...
409             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
410                 "sudo -S killall tcpdump || /bin/true ; " +
411                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
412                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
413
414         out = err = ""
415         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
416             
417     def clean_home(self):
418         """ Cleans all NEPI related folders in the Linux host
419         """
420         self.info("Cleaning up home")
421         
422         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
423                 self.home_dir )
424
425         return self.execute(cmd, with_lock = True)
426
427     def clean_experiment(self):
428         """ Cleans all experiment related files in the Linux host.
429         It preserves NEPI files and folders that have a multi experiment
430         scope.
431         """
432         self.info("Cleaning up experiment files")
433         
434         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
435                 self.exp_dir,
436                 self.ec.exp_id )
437             
438         return self.execute(cmd, with_lock = True)
439
440     def execute(self, command,
441             sudo = False,
442             stdin = None, 
443             env = None,
444             tty = False,
445             forward_x11 = False,
446             timeout = None,
447             retry = 3,
448             err_on_timeout = True,
449             connect_timeout = 30,
450             strict_host_checking = False,
451             persistent = True,
452             blocking = True,
453             with_lock = False
454             ):
455         """ Notice that this invocation will block until the
456         execution finishes. If this is not the desired behavior,
457         use 'run' instead."""
458
459         if self.localhost:
460             (out, err), proc = execfuncs.lexec(command, 
461                     user = user,
462                     sudo = sudo,
463                     stdin = stdin,
464                     env = env)
465         else:
466             if with_lock:
467                 with self._node_lock:
468                     (out, err), proc = sshfuncs.rexec(
469                         command, 
470                         host = self.get("hostname"),
471                         user = self.get("username"),
472                         port = self.get("port"),
473                         agent = True,
474                         sudo = sudo,
475                         stdin = stdin,
476                         identity = self.get("identity"),
477                         server_key = self.get("serverKey"),
478                         env = env,
479                         tty = tty,
480                         forward_x11 = forward_x11,
481                         timeout = timeout,
482                         retry = retry,
483                         err_on_timeout = err_on_timeout,
484                         connect_timeout = connect_timeout,
485                         persistent = persistent,
486                         blocking = blocking, 
487                         strict_host_checking = strict_host_checking
488                         )
489             else:
490                 (out, err), proc = sshfuncs.rexec(
491                     command, 
492                     host = self.get("hostname"),
493                     user = self.get("username"),
494                     port = self.get("port"),
495                     agent = True,
496                     sudo = sudo,
497                     stdin = stdin,
498                     identity = self.get("identity"),
499                     server_key = self.get("serverKey"),
500                     env = env,
501                     tty = tty,
502                     forward_x11 = forward_x11,
503                     timeout = timeout,
504                     retry = retry,
505                     err_on_timeout = err_on_timeout,
506                     connect_timeout = connect_timeout,
507                     persistent = persistent,
508                     blocking = blocking, 
509                     strict_host_checking = strict_host_checking
510                     )
511
512         return (out, err), proc
513
514     def run(self, command, home,
515             create_home = False,
516             pidfile = 'pidfile',
517             stdin = None, 
518             stdout = 'stdout', 
519             stderr = 'stderr', 
520             sudo = False,
521             tty = False):
522         
523         self.debug("Running command '%s'" % command)
524         
525         if self.localhost:
526             (out, err), proc = execfuncs.lspawn(command, pidfile, 
527                     stdout = stdout, 
528                     stderr = stderr, 
529                     stdin = stdin, 
530                     home = home, 
531                     create_home = create_home, 
532                     sudo = sudo,
533                     user = user) 
534         else:
535             with self._node_lock:
536                 (out, err), proc = sshfuncs.rspawn(
537                     command,
538                     pidfile = pidfile,
539                     home = home,
540                     create_home = create_home,
541                     stdin = stdin if stdin is not None else '/dev/null',
542                     stdout = stdout if stdout else '/dev/null',
543                     stderr = stderr if stderr else '/dev/null',
544                     sudo = sudo,
545                     host = self.get("hostname"),
546                     user = self.get("username"),
547                     port = self.get("port"),
548                     agent = True,
549                     identity = self.get("identity"),
550                     server_key = self.get("serverKey"),
551                     tty = tty
552                     )
553
554         return (out, err), proc
555
556     def getpid(self, home, pidfile = "pidfile"):
557         if self.localhost:
558             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
559         else:
560             with self._node_lock:
561                 pidtuple = sshfuncs.rgetpid(
562                     os.path.join(home, pidfile),
563                     host = self.get("hostname"),
564                     user = self.get("username"),
565                     port = self.get("port"),
566                     agent = True,
567                     identity = self.get("identity"),
568                     server_key = self.get("serverKey")
569                     )
570         
571         return pidtuple
572
573     def status(self, pid, ppid):
574         if self.localhost:
575             status = execfuncs.lstatus(pid, ppid)
576         else:
577             with self._node_lock:
578                 status = sshfuncs.rstatus(
579                         pid, ppid,
580                         host = self.get("hostname"),
581                         user = self.get("username"),
582                         port = self.get("port"),
583                         agent = True,
584                         identity = self.get("identity"),
585                         server_key = self.get("serverKey")
586                         )
587            
588         return status
589     
590     def kill(self, pid, ppid, sudo = False):
591         out = err = ""
592         proc = None
593         status = self.status(pid, ppid)
594
595         if status == sshfuncs.ProcStatus.RUNNING:
596             if self.localhost:
597                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
598             else:
599                 with self._node_lock:
600                     (out, err), proc = sshfuncs.rkill(
601                         pid, ppid,
602                         host = self.get("hostname"),
603                         user = self.get("username"),
604                         port = self.get("port"),
605                         agent = True,
606                         sudo = sudo,
607                         identity = self.get("identity"),
608                         server_key = self.get("serverKey")
609                         )
610
611         return (out, err), proc
612
613     def copy(self, src, dst):
614         if self.localhost:
615             (out, err), proc = execfuncs.lcopy(source, dest, 
616                     recursive = True,
617                     strict_host_checking = False)
618         else:
619             with self._node_lock:
620                 (out, err), proc = sshfuncs.rcopy(
621                     src, dst, 
622                     port = self.get("port"),
623                     identity = self.get("identity"),
624                     server_key = self.get("serverKey"),
625                     recursive = True,
626                     strict_host_checking = False)
627
628         return (out, err), proc
629
630
631     def upload(self, src, dst, text = False, overwrite = True):
632         """ Copy content to destination
633
634            src  content to copy. Can be a local file, directory or a list of files
635
636            dst  destination path on the remote host (remote is always self.host)
637
638            text src is text input, it must be stored into a temp file before uploading
639         """
640         # If source is a string input 
641         f = None
642         if text and not os.path.isfile(src):
643             # src is text input that should be uploaded as file
644             # create a temporal file with the content to upload
645             f = tempfile.NamedTemporaryFile(delete=False)
646             f.write(src)
647             f.close()
648             src = f.name
649
650         # If dst files should not be overwritten, check that the files do not
651         # exits already 
652         if overwrite == False:
653             src = self.filter_existing_files(src, dst)
654             if not src:
655                 return ("", ""), None 
656
657         if not self.localhost:
658             # Build destination as <user>@<server>:<path>
659             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
660
661         result = self.copy(src, dst)
662
663         # clean up temp file
664         if f:
665             os.remove(f.name)
666
667         return result
668
669     def download(self, src, dst):
670         if not self.localhost:
671             # Build destination as <user>@<server>:<path>
672             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
673         return self.copy(src, dst)
674
675     def install_packages_command(self, packages):
676         command = ""
677         if self.use_rpm:
678             command = rpmfuncs.install_packages_command(self.os, packages)
679         elif self.use_deb:
680             command = debfuncs.install_packages_command(self.os, packages)
681         else:
682             msg = "Error installing packages ( OS not known ) "
683             self.error(msg, self.os)
684             raise RuntimeError, msg
685
686         return command
687
688     def install_packages(self, packages, home, run_home = None):
689         """ Install packages in the Linux host.
690
691         'home' is the directory to upload the package installation script.
692         'run_home' is the directory from where to execute the script.
693         """
694         command = self.install_packages_command(packages)
695
696         run_home = run_home or home
697
698         (out, err), proc = self.run_and_wait(command, run_home, 
699             shfile = os.path.join(home, "instpkg.sh"),
700             pidfile = "instpkg_pidfile",
701             ecodefile = "instpkg_exitcode",
702             stdout = "instpkg_stdout", 
703             stderr = "instpkg_stderr",
704             overwrite = False,
705             raise_on_error = True)
706
707         return (out, err), proc 
708
709     def remove_packages(self, packages, home, run_home = None):
710         """ Uninstall packages from the Linux host.
711
712         'home' is the directory to upload the package un-installation script.
713         'run_home' is the directory from where to execute the script.
714         """
715         if self.use_rpm:
716             command = rpmfuncs.remove_packages_command(self.os, packages)
717         elif self.use_deb:
718             command = debfuncs.remove_packages_command(self.os, packages)
719         else:
720             msg = "Error removing packages ( OS not known ) "
721             self.error(msg)
722             raise RuntimeError, msg
723
724         run_home = run_home or home
725
726         (out, err), proc = self.run_and_wait(command, run_home, 
727             shfile = os.path.join(home, "rmpkg.sh"),
728             pidfile = "rmpkg_pidfile",
729             ecodefile = "rmpkg_exitcode",
730             stdout = "rmpkg_stdout", 
731             stderr = "rmpkg_stderr",
732             overwrite = False,
733             raise_on_error = True)
734          
735         return (out, err), proc 
736
737     def mkdir(self, path, clean = False):
738         if clean:
739             self.rmdir(path)
740
741         return self.execute("mkdir -p %s" % path, with_lock = True)
742
743     def rmdir(self, path):
744         return self.execute("rm -rf %s" % path, with_lock = True)
745         
746     def run_and_wait(self, command, home, 
747             shfile = "cmd.sh",
748             env = None,
749             overwrite = True,
750             pidfile = "pidfile", 
751             ecodefile = "exitcode", 
752             stdin = None, 
753             stdout = "stdout", 
754             stderr = "stderr", 
755             sudo = False,
756             tty = False,
757             raise_on_error = False):
758         """
759         Uploads the 'command' to a bash script in the host.
760         Then runs the script detached in background in the host, and
761         busy-waites until the script finishes executing.
762         """
763
764         if not shfile.startswith("/"):
765             shfile = os.path.join(home, shfile)
766
767         self.upload_command(command, 
768             shfile = shfile, 
769             ecodefile = ecodefile, 
770             env = env,
771             overwrite = overwrite)
772
773         command = "bash %s" % shfile
774         # run command in background in remote host
775         (out, err), proc = self.run(command, home, 
776                 pidfile = pidfile,
777                 stdin = stdin, 
778                 stdout = stdout, 
779                 stderr = stderr, 
780                 sudo = sudo,
781                 tty = tty)
782
783         # check no errors occurred
784         if proc.poll():
785             msg = " Failed to run command '%s' " % command
786             self.error(msg, out, err)
787             if raise_on_error:
788                 raise RuntimeError, msg
789
790         # Wait for pid file to be generated
791         pid, ppid = self.wait_pid(
792                 home = home, 
793                 pidfile = pidfile, 
794                 raise_on_error = raise_on_error)
795
796         # wait until command finishes to execute
797         self.wait_run(pid, ppid)
798       
799         (eout, err), proc = self.check_errors(home,
800             ecodefile = ecodefile,
801             stderr = stderr)
802
803         # Out is what was written in the stderr file
804         if err:
805             msg = " Failed to run command '%s' " % command
806             self.error(msg, eout, err)
807
808             if raise_on_error:
809                 raise RuntimeError, msg
810
811         (out, oerr), proc = self.check_output(home, stdout)
812         
813         return (out, err), proc
814
815     def exitcode(self, home, ecodefile = "exitcode"):
816         """
817         Get the exit code of an application.
818         Returns an integer value with the exit code 
819         """
820         (out, err), proc = self.check_output(home, ecodefile)
821
822         # Succeeded to open file, return exit code in the file
823         if proc.wait() == 0:
824             try:
825                 return int(out.strip())
826             except:
827                 # Error in the content of the file!
828                 return ExitCode.CORRUPTFILE
829
830         # No such file or directory
831         if proc.returncode == 1:
832             return ExitCode.FILENOTFOUND
833         
834         # Other error from 'cat'
835         return ExitCode.ERROR
836
837     def upload_command(self, command, 
838             shfile = "cmd.sh",
839             ecodefile = "exitcode",
840             overwrite = True,
841             env = None):
842         """ Saves the command as a bash script file in the remote host, and
843         forces to save the exit code of the command execution to the ecodefile
844         """
845
846         if not (command.strip().endswith(";") or command.strip().endswith("&")):
847             command += ";"
848       
849         # The exit code of the command will be stored in ecodefile
850         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
851                 'command': command,
852                 'ecodefile': ecodefile,
853                 } 
854
855         # Export environment
856         environ = self.format_environment(env)
857
858         # Add environ to command
859         command = environ + command
860
861         return self.upload(command, shfile, text = True, overwrite = overwrite)
862
863     def format_environment(self, env, inline = False):
864         """ Formats the environment variables for a command to be executed
865         either as an inline command
866         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
867         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
868         """
869         if not env: return ""
870
871         # Remove extra white spaces
872         env = re.sub(r'\s+', ' ', env.strip())
873
874         sep = ";" if inline else "\n"
875         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
876
877     def check_errors(self, home, 
878             ecodefile = "exitcode", 
879             stderr = "stderr"):
880         """ Checks whether errors occurred while running a command.
881         It first checks the exit code for the command, and only if the
882         exit code is an error one it returns the error output.
883
884         """
885         proc = None
886         err = ""
887
888         # get exit code saved in the 'exitcode' file
889         ecode = self.exitcode(home, ecodefile)
890
891         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
892             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
893         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
894             # The process returned an error code or didn't exist. 
895             # Check standard error.
896             (err, eerr), proc = self.check_output(home, stderr)
897
898             # If the stderr file was not found, assume nothing bad happened,
899             # and just ignore the error.
900             # (cat returns 1 for error "No such file or directory")
901             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
902                 err = "" 
903             
904         return ("", err), proc
905  
906     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
907         """ Waits until the pid file for the command is generated, 
908             and returns the pid and ppid of the process """
909         pid = ppid = None
910         delay = 1.0
911
912         for i in xrange(4):
913             pidtuple = self.getpid(home = home, pidfile = pidfile)
914             
915             if pidtuple:
916                 pid, ppid = pidtuple
917                 break
918             else:
919                 time.sleep(delay)
920                 delay = delay * 1.5
921         else:
922             msg = " Failed to get pid for pidfile %s/%s " % (
923                     home, pidfile )
924             self.error(msg)
925             
926             if raise_on_error:
927                 raise RuntimeError, msg
928
929         return pid, ppid
930
931     def wait_run(self, pid, ppid, trial = 0):
932         """ wait for a remote process to finish execution """
933         delay = 1.0
934
935         while True:
936             status = self.status(pid, ppid)
937             
938             if status is ProcStatus.FINISHED:
939                 break
940             elif status is not ProcStatus.RUNNING:
941                 delay = delay * 1.5
942                 time.sleep(delay)
943                 # If it takes more than 20 seconds to start, then
944                 # asume something went wrong
945                 if delay > 20:
946                     break
947             else:
948                 # The app is running, just wait...
949                 time.sleep(0.5)
950
951     def check_output(self, home, filename):
952         """ Retrives content of file """
953         (out, err), proc = self.execute("cat %s" % 
954             os.path.join(home, filename), retry = 1, with_lock = True)
955         return (out, err), proc
956
957     def is_alive(self):
958         """ Checks if host is responsive
959         """
960         if self.localhost:
961             return True
962
963         out = err = ""
964         # The underlying SSH layer will sometimes return an empty
965         # output (even if the command was executed without errors).
966         # To work arround this, repeat the operation N times or
967         # until the result is not empty string
968         retrydelay = 1.0
969         for i in xrange(10):
970             try:
971                 (out, err), proc = self.execute("echo 'ALIVE'",
972                         retry = 5,
973                         blocking = True,
974                         with_lock = True)
975         
976                 if out.find("ALIVE") > -1:
977                     return True
978             except:
979                 trace = traceback.format_exc()
980                 msg = "Unresponsive host. Error reaching host: %s " % trace
981                 self.error(msg, out, err)
982                 return False
983
984             time.sleep(min(30.0, retrydelay))
985             retrydelay *= 1.5
986
987         if out.find("ALIVE") > -1:
988             return True
989         else:
990             msg = "Unresponsive host. Wrong answer. "
991             self.error(msg, out, err)
992             return False
993
994     def find_home(self):
995         """ Retrieves host home directory
996         """
997         # The underlying SSH layer will sometimes return an empty
998         # output (even if the command was executed without errors).
999         # To work arround this, repeat the operation N times or
1000         # until the result is not empty string
1001         retrydelay = 1.0
1002         for i in xrange(10):
1003             try:
1004                 (out, err), proc = self.execute("echo ${HOME}",
1005                         retry = 5,
1006                         blocking = True,
1007                         with_lock = True)
1008         
1009                 if out.strip() != "":
1010                     self._home_dir =  out.strip()
1011                     break
1012             except:
1013                 trace = traceback.format_exc()
1014                 msg = "Impossible to retrieve HOME directory" % trace
1015                 self.error(msg, out, err)
1016                 return False
1017
1018             time.sleep(min(30.0, retrydelay))
1019             retrydelay *= 1.5
1020
1021         if not self._home_dir:
1022             msg = "Impossible to retrieve HOME directory"
1023             self.error(msg, out, err)
1024             raise RuntimeError, msg
1025
1026     def filter_existing_files(self, src, dst):
1027         """ Removes files that already exist in the Linux host from src list
1028         """
1029         # construct a dictionary with { dst: src }
1030         dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
1031             src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1032
1033         command = []
1034         for d in dests.keys():
1035             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1036
1037         command = ";".join(command)
1038
1039         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1040     
1041         for d in dests.keys():
1042             if out.find(d) > -1:
1043                 del dests[d]
1044
1045         if not dests:
1046             return ""
1047
1048         return " ".join(dests.values())
1049