expose stack trace in error message when node is considered unresponsive
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy, 
21                                      ResourceState)
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33 import traceback
34
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!! 
37
38 class ExitCode:
39     """
40     Error codes that the rexitcode function can return if unable to
41     check the exit code of a spawned process
42     """
43     FILENOTFOUND = -1
44     CORRUPTFILE = -2
45     ERROR = -3
46     OK = 0
47
48 class OSType:
49     """
50     Supported flavors of Linux OS
51     """
52     DEBIAN = 1 
53     UBUNTU = 1 << 1 
54     FEDORA = 1 << 2
55     FEDORA_8 = 1 << 3 | FEDORA 
56     FEDORA_12 = 1 << 4 | FEDORA 
57     FEDORA_14 = 1 << 5 | FEDORA 
58
59 @clsinit_copy
60 class LinuxNode(ResourceManager):
61     """
62     .. class:: Class Args :
63       
64         :param ec: The Experiment controller
65         :type ec: ExperimentController
66         :param guid: guid of the RM
67         :type guid: int
68
69     .. note::
70
71         There are different ways in which commands can be executed using the
72         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
73         'run_and_wait'). 
74         
75         Brief explanation:
76
77             * 'execute' (blocking mode) :  
78
79                      HOW IT WORKS: 'execute', forks a process and run the
80                      command, synchronously, attached to the terminal, in
81                      foreground.
82                      The execute method will block until the command returns
83                      the result on 'out', 'err' (so until it finishes executing).
84   
85                      USAGE: short-lived commands that must be executed attached
86                      to a terminal and in foreground, for which it IS necessary
87                      to block until the command has finished (e.g. if you want
88                      to run 'ls' or 'cat').
89
90             * 'execute' (NON blocking mode - blocking = False) :
91
92                     HOW IT WORKS: Same as before, except that execute method
93                     will return immediately (even if command still running).
94
95                     USAGE: long-lived commands that must be executed attached
96                     to a terminal and in foreground, but for which it is not
97                     necessary to block until the command has finished. (e.g.
98                     start an application using X11 forwarding)
99
100              * 'run' :
101
102                    HOW IT WORKS: Connects to the host ( using SSH if remote)
103                    and launches the command in background, detached from any
104                    terminal (daemonized), and returns. The command continues to
105                    run remotely, but since it is detached from the terminal,
106                    its pipes (stdin, stdout, stderr) can't be redirected to the
107                    console (as normal non detached processes would), and so they
108                    are explicitly redirected to files. The pidfile is created as
109                    part of the process of launching the command. The pidfile
110                    holds the pid and ppid of the process forked in background,
111                    so later on it is possible to check whether the command is still
112                    running.
113
114                     USAGE: long-lived commands that can run detached in background,
115                     for which it is NOT necessary to block (wait) until the command
116                     has finished. (e.g. start an application that is not using X11
117                     forwarding. It can run detached and remotely in background)
118
119              * 'run_and_wait' :
120
121                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122                     the command has finished execution. It also checks whether
123                     errors occurred during runtime by reading the exitcode file,
124                     which contains the exit code of the command that was run
125                     (checking stderr only is not always reliable since many
126                     commands throw debugging info to stderr and the only way to
127                     automatically know whether an error really happened is to
128                     check the process exit code).
129
130                     Another difference with respect to 'run', is that instead
131                     of directly executing the command as a bash command line,
132                     it uploads the command to a bash script and runs the script.
133                     This allows to use the bash script to debug errors, since
134                     it remains at the remote host and can be run manually to
135                     reproduce the error.
136                   
137                     USAGE: medium-lived commands that can run detached in
138                     background, for which it IS necessary to block (wait) until
139                     the command has finished. (e.g. Package installation,
140                     source compilation, file download, etc)
141
142     """
143     _rtype = "linux::Node"
144     _help = "Controls Linux host machines ( either localhost or a host " \
145             "that can be accessed using a SSH key)"
146     _platform = "linux"
147
148     @classmethod
149     def _register_attributes(cls):
150         cls._register_attribute(Attribute(
151             "hostname", "Hostname of the machine",
152             flags = Flags.Design))
153
154         cls._register_attribute(Attribute(
155             "username", "Local account username", 
156             flags = Flags.Credential))
157
158         cls._register_attribute(Attribute(
159             "port", "SSH port",
160             flags = Flags.Design))
161         
162         cls._register_attribute(Attribute(
163             "home",
164             "Experiment home directory to store all experiment related files",
165             flags = Flags.Design))
166         
167         cls._register_attribute(Attribute(
168             "identity", "SSH identity file",
169             flags = Flags.Credential))
170         
171         cls._register_attribute(Attribute(
172             "serverKey", "Server public key", 
173             flags = Flags.Design))
174         
175         cls._register_attribute(Attribute(
176             "cleanHome",
177             "Remove all nepi files and directories "
178             " from node home folder before starting experiment", 
179             type = Types.Bool,
180             default = False,
181             flags = Flags.Design))
182
183         cls._register_attribute(Attribute(
184             "cleanExperiment", "Remove all files and directories " 
185             " from a previous same experiment, before the new experiment starts", 
186             type = Types.Bool,
187             default = False,
188             flags = Flags.Design))
189         
190         cls._register_attribute(Attribute(
191             "cleanProcesses", 
192             "Kill all running processes before starting experiment",
193             type = Types.Bool,
194             default = False,
195             flags = Flags.Design))
196         
197         cls._register_attribute(Attribute(
198             "cleanProcessesAfter", 
199             """Kill all running processes after starting experiment
200             This might be dangerous when using user root""",
201             type = Types.Bool,
202             default = True,
203             flags = Flags.Design))
204         
205         cls._register_attribute(Attribute(
206             "tearDown",
207             "Bash script to be executed before releasing the resource",
208             flags = Flags.Design))
209
210         cls._register_attribute(Attribute(
211             "gatewayUser",
212             "Gateway account username",
213             flags = Flags.Design))
214
215         cls._register_attribute(Attribute(
216             "gateway",
217             "Hostname of the gateway machine",
218             flags = Flags.Design))
219
220         cls._register_attribute(Attribute(
221             "ip",
222             "Linux host public IP address. "
223             "Must not be modified by the user unless hostname is 'localhost'",
224             flags = Flags.Design))
225
226     def __init__(self, ec, guid):
227         super(LinuxNode, self).__init__(ec, guid)
228         self._os = None
229         # home directory at Linux host
230         self._home_dir = ""
231
232         # lock to prevent concurrent applications on the same node,
233         # to execute commands at the same time. There are potential
234         # concurrency issues when using SSH to a same host from 
235         # multiple threads. There are also possible operational 
236         # issues, e.g. an application querying the existence 
237         # of a file or folder prior to its creation, and another 
238         # application creating the same file or folder in between.
239         self._node_lock = threading.Lock()
240         
241     def log_message(self, msg):
242         return " guid {} - host {} - {} "\
243             .format(self.guid, self.get("hostname"), msg)
244
245     @property
246     def home_dir(self):
247         home = self.get("home") or ""
248         if not home.startswith("/"):
249             home = os.path.join(self._home_dir, home) 
250             return home
251
252     @property
253     def nepi_home(self):
254         return os.path.join(self.home_dir, ".nepi")
255
256     @property
257     def usr_dir(self):
258         return os.path.join(self.nepi_home, "nepi-usr")
259
260     @property
261     def lib_dir(self):
262         return os.path.join(self.usr_dir, "lib")
263
264     @property
265     def bin_dir(self):
266         return os.path.join(self.usr_dir, "bin")
267
268     @property
269     def src_dir(self):
270         return os.path.join(self.usr_dir, "src")
271
272     @property
273     def share_dir(self):
274         return os.path.join(self.usr_dir, "share")
275
276     @property
277     def exp_dir(self):
278         return os.path.join(self.nepi_home, "nepi-exp")
279
280     @property
281     def exp_home(self):
282         return os.path.join(self.exp_dir, self.ec.exp_id)
283
284     @property
285     def node_home(self):
286         return os.path.join(self.exp_home, "node-{}".format(self.guid))
287
288     @property
289     def run_home(self):
290         return os.path.join(self.node_home, self.ec.run_id)
291
292     @property
293     def os(self):
294         if self._os:
295             return self._os
296
297         if not self.localhost and not self.get("username"):
298             msg = "Can't resolve OS, insufficient data "
299             self.error(msg)
300             raise RuntimeError, msg
301
302         out = self.get_os()
303
304         if out.find("Debian") == 0: 
305             self._os = OSType.DEBIAN
306         elif out.find("Ubuntu") ==0:
307             self._os = OSType.UBUNTU
308         elif out.find("Fedora release") == 0:
309             self._os = OSType.FEDORA
310             if out.find("Fedora release 8") == 0:
311                 self._os = OSType.FEDORA_8
312             elif out.find("Fedora release 12") == 0:
313                 self._os = OSType.FEDORA_12
314             elif out.find("Fedora release 14") == 0:
315                 self._os = OSType.FEDORA_14
316         else:
317             msg = "Unsupported OS"
318             self.error(msg, out)
319             raise RuntimeError("{} - {} ".format(msg, out))
320
321         return self._os
322
323     def get_os(self):
324         # The underlying SSH layer will sometimes return an empty
325         # output (even if the command was executed without errors).
326         # To work arround this, repeat the operation N times or
327         # until the result is not empty string
328         out = ""
329         try:
330             (out, err), proc = self.execute("cat /etc/issue", 
331                                             with_lock = True,
332                                             blocking = True)
333         except:
334             trace = traceback.format_exc()
335             msg = "Error detecting OS: {} ".format(trace)
336             self.error(msg, out, err)
337     
338         return out
339
340     @property
341     def use_deb(self):
342         return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
343
344     @property
345     def use_rpm(self):
346         return (self.os & OSType.FEDORA)
347
348     @property
349     def localhost(self):
350         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
351
352     def do_provision(self):
353         # check if host is alive
354         if not self.is_alive():
355             trace = traceback.format_exc()
356             msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
357             self.error(msg)
358             raise RuntimeError, msg
359
360         self.find_home()
361
362         if self.get("cleanProcesses"):
363             self.clean_processes()
364
365         if self.get("cleanHome"):
366             self.clean_home()
367             
368         if self.get("cleanExperiment"):
369             self.clean_experiment()
370             
371         # Create shared directory structure and node home directory
372         paths = [self.lib_dir, 
373                  self.bin_dir, 
374                  self.src_dir, 
375                  self.share_dir, 
376                  self.node_home]
377
378         self.mkdir(paths)
379
380         # Get Public IP address if possible
381         if not self.get("ip"):
382             try:
383                 ip = sshfuncs.gethostbyname(self.get("hostname"))
384                 self.set("ip", ip)
385             except:
386                 if self.get("gateway") is None:
387                     msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
388                     self.error(msg)
389
390         super(LinuxNode, self).do_provision()
391
392     def do_deploy(self):
393         if self.state == ResourceState.NEW:
394             self.info("Deploying node")
395             self.do_discover()
396             self.do_provision()
397
398         # Node needs to wait until all associated interfaces are 
399         # ready before it can finalize deployment
400         from nepi.resources.linux.interface import LinuxInterface
401         ifaces = self.get_connected(LinuxInterface.get_rtype())
402         for iface in ifaces:
403             if iface.state < ResourceState.READY:
404                 self.ec.schedule(self.reschedule_delay, self.deploy)
405                 return 
406
407         super(LinuxNode, self).do_deploy()
408
409     def do_release(self):
410         rms = self.get_connected()
411         for rm in rms:
412             # Node needs to wait until all associated RMs are released
413             # before it can be released
414             if rm.state != ResourceState.RELEASED:
415                 self.ec.schedule(self.reschedule_delay, self.release)
416                 return 
417
418         tear_down = self.get("tearDown")
419         if tear_down:
420             self.execute(tear_down)
421
422         if self.get("cleanProcessesAfter"):
423             self.clean_processes()
424
425         super(LinuxNode, self).do_release()
426
427     def valid_connection(self, guid):
428         # TODO: Validate!
429         return True
430
431     def clean_processes(self):
432         self.info("Cleaning up processes")
433
434         if self.localhost:
435             return 
436         
437         if self.get("username") != 'root':
438             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
439                    "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
440                    "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
441         else:
442             if self.state >= ResourceState.READY:
443                 import pickle
444                 pids = pickle.load(open("/tmp/save.proc", "rb"))
445                 pids_temp = dict()
446                 ps_aux = "ps aux |awk '{print $2,$11}'"
447                 (out, err), proc = self.execute(ps_aux)
448                 if len(out) != 0:
449                     for line in out.strip().split("\n"):
450                         parts = line.strip().split(" ")
451                         pids_temp[parts[0]] = parts[1]
452                     kill_pids = set(pids_temp.items()) - set(pids.items())
453                     kill_pids = ' '.join(dict(kill_pids).keys())
454
455                     cmd = ("killall tcpdump || /bin/true ; " +
456                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
457                            "kill {} || /bin/true ; ".format(kill_pids))
458                 else:
459                     cmd = ("killall tcpdump || /bin/true ; " +
460                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
461             else:
462                 cmd = ("killall tcpdump || /bin/true ; " +
463                        "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
464
465         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
466
467     def clean_home(self):
468         """ Cleans all NEPI related folders in the Linux host
469         """
470         self.info("Cleaning up home")
471         
472         cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
473               .format(self.home_dir)
474
475         return self.execute(cmd, with_lock = True)
476
477     def clean_experiment(self):
478         """ Cleans all experiment related files in the Linux host.
479         It preserves NEPI files and folders that have a multi experiment
480         scope.
481         """
482         self.info("Cleaning up experiment files")
483         
484         cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
485               .format(self.exp_dir, self.ec.exp_id)
486         
487         return self.execute(cmd, with_lock = True)
488
489     def execute(self, command,
490                 sudo = False,
491                 env = None,
492                 tty = False,
493                 forward_x11 = False,
494                 retry = 3,
495                 connect_timeout = 30,
496                 strict_host_checking = False,
497                 persistent = True,
498                 blocking = True,
499                 with_lock = False
500     ):
501         """ Notice that this invocation will block until the
502         execution finishes. If this is not the desired behavior,
503         use 'run' instead."""
504
505         if self.localhost:
506             (out, err), proc = execfuncs.lexec(
507                 command, 
508                 user = self.get("username"), # still problem with localhost
509                 sudo = sudo,
510                 env = env)
511         else:
512             if with_lock:
513                 # If the execute command is blocking, we don't want to keep
514                 # the node lock. This lock is used to avoid race conditions
515                 # when creating the ControlMaster sockets. A more elegant
516                 # solution is needed.
517                 with self._node_lock:
518                     (out, err), proc = sshfuncs.rexec(
519                         command, 
520                         host = self.get("hostname"),
521                         user = self.get("username"),
522                         port = self.get("port"),
523                         gwuser = self.get("gatewayUser"),
524                         gw = self.get("gateway"),
525                         agent = True,
526                         sudo = sudo,
527                         identity = self.get("identity"),
528                         server_key = self.get("serverKey"),
529                         env = env,
530                         tty = tty,
531                         forward_x11 = forward_x11,
532                         retry = retry,
533                         connect_timeout = connect_timeout,
534                         persistent = persistent,
535                         blocking = blocking, 
536                         strict_host_checking = strict_host_checking
537                     )
538             else:
539                 (out, err), proc = sshfuncs.rexec(
540                     command, 
541                     host = self.get("hostname"),
542                     user = self.get("username"),
543                     port = self.get("port"),
544                     gwuser = self.get("gatewayUser"),
545                     gw = self.get("gateway"),
546                     agent = True,
547                     sudo = sudo,
548                     identity = self.get("identity"),
549                     server_key = self.get("serverKey"),
550                     env = env,
551                     tty = tty,
552                     forward_x11 = forward_x11,
553                     retry = retry,
554                     connect_timeout = connect_timeout,
555                     persistent = persistent,
556                     blocking = blocking, 
557                     strict_host_checking = strict_host_checking
558                 )
559
560         return (out, err), proc
561
562     def run(self, command, home,
563             create_home = False,
564             pidfile = 'pidfile',
565             stdin = None, 
566             stdout = 'stdout', 
567             stderr = 'stderr', 
568             sudo = False,
569             tty = False,
570             strict_host_checking = False):
571         
572         self.debug("Running command '{}'".format(command))
573         
574         if self.localhost:
575             (out, err), proc = execfuncs.lspawn(
576                 command, pidfile,
577                 home = home, 
578                 create_home = create_home, 
579                 stdin = stdin or '/dev/null',
580                 stdout = stdout or '/dev/null',
581                 stderr = stderr or '/dev/null',
582                 sudo = sudo) 
583         else:
584             with self._node_lock:
585                 (out, err), proc = sshfuncs.rspawn(
586                     command,
587                     pidfile = pidfile,
588                     home = home,
589                     create_home = create_home,
590                     stdin = stdin or '/dev/null',
591                     stdout = stdout or '/dev/null',
592                     stderr = stderr or '/dev/null',
593                     sudo = sudo,
594                     host = self.get("hostname"),
595                     user = self.get("username"),
596                     port = self.get("port"),
597                     gwuser = self.get("gatewayUser"),
598                     gw = self.get("gateway"),
599                     agent = True,
600                     identity = self.get("identity"),
601                     server_key = self.get("serverKey"),
602                     tty = tty,
603                     strict_host_checking = strict_host_checking
604                 )
605
606         return (out, err), proc
607
608     def getpid(self, home, pidfile = "pidfile"):
609         if self.localhost:
610             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
611         else:
612             with self._node_lock:
613                 pidtuple = sshfuncs.rgetpid(
614                     os.path.join(home, pidfile),
615                     host = self.get("hostname"),
616                     user = self.get("username"),
617                     port = self.get("port"),
618                     gwuser = self.get("gatewayUser"),
619                     gw = self.get("gateway"),
620                     agent = True,
621                     identity = self.get("identity"),
622                     server_key = self.get("serverKey"),
623                     strict_host_checking = False
624                 )
625         
626         return pidtuple
627
628     def status(self, pid, ppid):
629         if self.localhost:
630             status = execfuncs.lstatus(pid, ppid)
631         else:
632             with self._node_lock:
633                 status = sshfuncs.rstatus(
634                     pid, ppid,
635                     host = self.get("hostname"),
636                     user = self.get("username"),
637                     port = self.get("port"),
638                     gwuser = self.get("gatewayUser"),
639                     gw = self.get("gateway"),
640                     agent = True,
641                     identity = self.get("identity"),
642                     server_key = self.get("serverKey"),
643                     strict_host_checking = False
644                 )
645            
646         return status
647     
648     def kill(self, pid, ppid, sudo = False):
649         out = err = ""
650         proc = None
651         status = self.status(pid, ppid)
652
653         if status == sshfuncs.ProcStatus.RUNNING:
654             if self.localhost:
655                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
656             else:
657                 with self._node_lock:
658                     (out, err), proc = sshfuncs.rkill(
659                         pid, ppid,
660                         host = self.get("hostname"),
661                         user = self.get("username"),
662                         port = self.get("port"),
663                         gwuser = self.get("gatewayUser"),
664                         gw = self.get("gateway"),
665                         agent = True,
666                         sudo = sudo,
667                         identity = self.get("identity"),
668                         server_key = self.get("serverKey"),
669                         strict_host_checking = False
670                     )
671
672         return (out, err), proc
673
674     def copy(self, src, dst):
675         if self.localhost:
676             (out, err), proc = execfuncs.lcopy(
677                 src, dst, 
678                 recursive = True)
679         else:
680             with self._node_lock:
681                 (out, err), proc = sshfuncs.rcopy(
682                     src, dst, 
683                     port = self.get("port"),
684                     gwuser = self.get("gatewayUser"),
685                     gw = self.get("gateway"),
686                     identity = self.get("identity"),
687                     server_key = self.get("serverKey"),
688                     recursive = True,
689                     strict_host_checking = False)
690
691         return (out, err), proc
692
693     def upload(self, src, dst, text = False, overwrite = True,
694                raise_on_error = True):
695         """ Copy content to destination
696
697         src  string with the content to copy. Can be:
698             - plain text
699             - a string with the path to a local file
700             - a string with a semi-colon separeted list of local files
701             - a string with a local directory
702
703         dst  string with destination path on the remote host (remote is 
704             always self.host)
705
706         text src is text input, it must be stored into a temp file before 
707         uploading
708         """
709         # If source is a string input 
710         f = None
711         if text and not os.path.isfile(src):
712             # src is text input that should be uploaded as file
713             # create a temporal file with the content to upload
714             f = tempfile.NamedTemporaryFile(delete=False)
715             f.write(src)
716             f.close()
717             src = f.name
718
719         # If dst files should not be overwritten, check that the files do not
720         # exits already
721         if isinstance(src, str):
722             src = map(str.strip, src.split(";"))
723     
724         if overwrite == False:
725             src = self.filter_existing_files(src, dst)
726             if not src:
727                 return ("", ""), None
728
729         if not self.localhost:
730             # Build destination as <user>@<server>:<path>
731             dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
732
733         ((out, err), proc) = self.copy(src, dst)
734
735         # clean up temp file
736         if f:
737             os.remove(f.name)
738
739         if err:
740             msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
741             self.error(msg, out, err)
742             
743             msg = "{} out: {} err: {}".format(msg, out, err)
744             if raise_on_error:
745                 raise RuntimeError, msg
746
747         return ((out, err), proc)
748
749     def download(self, src, dst, raise_on_error = True):
750         if not self.localhost:
751             # Build destination as <user>@<server>:<path>
752             src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
753
754         ((out, err), proc) = self.copy(src, dst)
755
756         if err:
757             msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst) 
758             self.error(msg, out, err)
759
760             if raise_on_error:
761                 raise RuntimeError, msg
762
763         return ((out, err), proc)
764
765     def install_packages_command(self, packages):
766         command = ""
767         if self.use_rpm:
768             command = rpmfuncs.install_packages_command(self.os, packages)
769         elif self.use_deb:
770             command = debfuncs.install_packages_command(self.os, packages)
771         else:
772             msg = "Error installing packages ( OS not known ) "
773             self.error(msg, self.os)
774             raise RuntimeError, msg
775
776         return command
777
778     def install_packages(self, packages, home,
779                          run_home = None,
780                          raise_on_error = True):
781         """ Install packages in the Linux host.
782
783         'home' is the directory to upload the package installation script.
784         'run_home' is the directory from where to execute the script.
785         """
786         command = self.install_packages_command(packages)
787
788         run_home = run_home or home
789
790         (out, err), proc = self.run_and_wait(command, run_home, 
791                                              shfile = os.path.join(home, "instpkg.sh"),
792                                              pidfile = "instpkg_pidfile",
793                                              ecodefile = "instpkg_exitcode",
794                                              stdout = "instpkg_stdout", 
795                                              stderr = "instpkg_stderr",
796                                              overwrite = False,
797                                              raise_on_error = raise_on_error)
798
799         return (out, err), proc 
800
801     def remove_packages(self, packages, home, run_home = None,
802                         raise_on_error = True):
803         """ Uninstall packages from the Linux host.
804
805         'home' is the directory to upload the package un-installation script.
806         'run_home' is the directory from where to execute the script.
807         """
808         if self.use_rpm:
809             command = rpmfuncs.remove_packages_command(self.os, packages)
810         elif self.use_deb:
811             command = debfuncs.remove_packages_command(self.os, packages)
812         else:
813             msg = "Error removing packages ( OS not known ) "
814             self.error(msg)
815             raise RuntimeError, msg
816
817         run_home = run_home or home
818
819         (out, err), proc = self.run_and_wait(command, run_home, 
820                                              shfile = os.path.join(home, "rmpkg.sh"),
821                                              pidfile = "rmpkg_pidfile",
822                                              ecodefile = "rmpkg_exitcode",
823                                              stdout = "rmpkg_stdout", 
824                                              stderr = "rmpkg_stderr",
825                                              overwrite = False,
826                                              raise_on_error = raise_on_error)
827         
828         return (out, err), proc 
829
830     def mkdir(self, paths, clean = False):
831         """ Paths is either a single remote directory path to create,
832         or a list of directories to create.
833         """
834         if clean:
835             self.rmdir(paths)
836
837         if isinstance(paths, str):
838             paths = [paths]
839
840         cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
841
842         return self.execute(cmd, with_lock = True)
843
844     def rmdir(self, paths):
845         """ Paths is either a single remote directory path to delete,
846         or a list of directories to delete.
847         """
848
849         if isinstance(paths, str):
850             paths = [paths]
851
852         cmd = " ; ".join(map(lambda path: "rm -rf {}".format(path), paths))
853
854         return self.execute(cmd, with_lock = True)
855     
856     def run_and_wait(self, command, home, 
857                      shfile="cmd.sh",
858                      env=None,
859                      overwrite=True,
860                      wait_run=True,
861                      pidfile="pidfile", 
862                      ecodefile="exitcode", 
863                      stdin=None, 
864                      stdout="stdout", 
865                      stderr="stderr", 
866                      sudo=False,
867                      tty=False,
868                      raise_on_error=True):
869         """
870         Uploads the 'command' to a bash script in the host.
871         Then runs the script detached in background in the host, and
872         busy-waites until the script finishes executing.
873         """
874
875         if not shfile.startswith("/"):
876             shfile = os.path.join(home, shfile)
877
878         self.upload_command(command, 
879                             shfile = shfile, 
880                             ecodefile = ecodefile, 
881                             env = env,
882                             overwrite = overwrite)
883
884         command = "bash {}".format(shfile)
885         # run command in background in remote host
886         (out, err), proc = self.run(command, home, 
887                                     pidfile = pidfile,
888                                     stdin = stdin, 
889                                     stdout = stdout, 
890                                     stderr = stderr, 
891                                     sudo = sudo,
892                                     tty = tty)
893
894         # check no errors occurred
895         if proc.poll():
896             msg = " Failed to run command '{}' ".format(command)
897             self.error(msg, out, err)
898             if raise_on_error:
899                 raise RuntimeError, msg
900
901         # Wait for pid file to be generated
902         pid, ppid = self.wait_pid(
903             home = home, 
904             pidfile = pidfile, 
905             raise_on_error = raise_on_error)
906
907         if wait_run:
908             # wait until command finishes to execute
909             self.wait_run(pid, ppid)
910             
911             (eout, err), proc = self.check_errors(home,
912                                                   ecodefile = ecodefile,
913                                                   stderr = stderr)
914
915             # Out is what was written in the stderr file
916             if err:
917                 msg = " Failed to run command '{}' ".format(command)
918                 self.error(msg, eout, err)
919
920                 if raise_on_error:
921                     raise RuntimeError, msg
922
923         (out, oerr), proc = self.check_output(home, stdout)
924         
925         return (out, err), proc
926         
927     def exitcode(self, home, ecodefile = "exitcode"):
928         """
929         Get the exit code of an application.
930         Returns an integer value with the exit code 
931         """
932         (out, err), proc = self.check_output(home, ecodefile)
933
934         # Succeeded to open file, return exit code in the file
935         if proc.wait() == 0:
936             try:
937                 return int(out.strip())
938             except:
939                 # Error in the content of the file!
940                 return ExitCode.CORRUPTFILE
941
942         # No such file or directory
943         if proc.returncode == 1:
944             return ExitCode.FILENOTFOUND
945         
946         # Other error from 'cat'
947         return ExitCode.ERROR
948
949     def upload_command(self, command, 
950                        shfile="cmd.sh",
951                        ecodefile="exitcode",
952                        overwrite=True,
953                        env=None):
954         """ Saves the command as a bash script file in the remote host, and
955         forces to save the exit code of the command execution to the ecodefile
956         """
957
958         if not (command.strip().endswith(";") or command.strip().endswith("&")):
959             command += ";"
960             
961         # The exit code of the command will be stored in ecodefile
962         command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
963                   .format(command=command, ecodefile=ecodefile)
964
965         # Export environment
966         environ = self.format_environment(env)
967
968         # Add environ to command
969         command = environ + command
970
971         return self.upload(command, shfile, text=True, overwrite=overwrite)
972
973     def format_environment(self, env, inline=False):
974         """ Formats the environment variables for a command to be executed
975         either as an inline command
976         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
977         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
978         """
979         if not env: return ""
980
981         # Remove extra white spaces
982         env = re.sub(r'\s+', ' ', env.strip())
983
984         sep = ";" if inline else "\n"
985         return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep 
986
987     def check_errors(self, home, 
988                      ecodefile = "exitcode", 
989                      stderr = "stderr"):
990         """ Checks whether errors occurred while running a command.
991         It first checks the exit code for the command, and only if the
992         exit code is an error one it returns the error output.
993
994         """
995         proc = None
996         err = ""
997
998         # get exit code saved in the 'exitcode' file
999         ecode = self.exitcode(home, ecodefile)
1000
1001         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1002             err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1003         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1004             # The process returned an error code or didn't exist. 
1005             # Check standard error.
1006             (err, eerr), proc = self.check_output(home, stderr)
1007
1008             # If the stderr file was not found, assume nothing bad happened,
1009             # and just ignore the error.
1010             # (cat returns 1 for error "No such file or directory")
1011             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
1012                 err = "" 
1013                 
1014         return ("", err), proc
1015     
1016     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1017         """ Waits until the pid file for the command is generated, 
1018             and returns the pid and ppid of the process """
1019         pid = ppid = None
1020         delay = 1.0
1021
1022         for i in xrange(2):
1023             pidtuple = self.getpid(home = home, pidfile = pidfile)
1024             
1025             if pidtuple:
1026                 pid, ppid = pidtuple
1027                 break
1028             else:
1029                 time.sleep(delay)
1030                 delay = delay * 1.5
1031         else:
1032             msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1033             self.error(msg)
1034     
1035             if raise_on_error:
1036                 raise RuntimeError, msg
1037
1038         return pid, ppid
1039
1040     def wait_run(self, pid, ppid, trial = 0):
1041         """ wait for a remote process to finish execution """
1042         delay = 1.0
1043
1044         while True:
1045             status = self.status(pid, ppid)
1046             
1047             if status is ProcStatus.FINISHED:
1048                 break
1049             elif status is not ProcStatus.RUNNING:
1050                 delay = delay * 1.5
1051                 time.sleep(delay)
1052                 # If it takes more than 20 seconds to start, then
1053                 # asume something went wrong
1054                 if delay > 20:
1055                     break
1056             else:
1057                 # The app is running, just wait...
1058                 time.sleep(0.5)
1059
1060     def check_output(self, home, filename):
1061         """ Retrives content of file """
1062         (out, err), proc = self.execute(
1063             "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1064         return (out, err), proc
1065
1066     def is_alive(self):
1067         """ Checks if host is responsive
1068         """
1069         if self.localhost:
1070             return True
1071
1072         out = err = ""
1073         msg = "Unresponsive host. Wrong answer. "
1074
1075         # The underlying SSH layer will sometimes return an empty
1076         # output (even if the command was executed without errors).
1077         # To work arround this, repeat the operation N times or
1078         # until the result is not empty string
1079         try:
1080             (out, err), proc = self.execute("echo 'ALIVE'",
1081                                             blocking = True,
1082                                             with_lock = True)
1083             
1084             if out.find("ALIVE") > -1:
1085                 return True
1086         except:
1087             trace = traceback.format_exc()
1088             msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1089
1090         self.error(msg, out, err)
1091         return False
1092
1093     def find_home(self):
1094         """ 
1095         Retrieves host home directory
1096         """
1097         # The underlying SSH layer will sometimes return an empty
1098         # output (even if the command was executed without errors).
1099         # To work arround this, repeat the operation N times or
1100         # until the result is not empty string
1101         msg = "Impossible to retrieve HOME directory"
1102         try:
1103             (out, err), proc = self.execute("echo ${HOME}",
1104                                             blocking = True,
1105                                             with_lock = True)
1106             
1107             if out.strip() != "":
1108                 self._home_dir =  out.strip()
1109         except:
1110             trace = traceback.format_exc()
1111             msg = "Impossible to retrieve HOME directory {}".format(trace)
1112
1113         if not self._home_dir:
1114             self.error(msg)
1115             raise RuntimeError, msg
1116
1117     def filter_existing_files(self, src, dst):
1118         """ Removes files that already exist in the Linux host from src list
1119         """
1120         # construct a dictionary with { dst: src }
1121         dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1122                 if len(src) > 1 else {dst: src[0]}
1123
1124         command = []
1125         for d in dests.keys():
1126             command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1127
1128         command = ";".join(command)
1129
1130         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1131         
1132         for d in dests.keys():
1133             if out.find(d) > -1:
1134                 del dests[d]
1135
1136         if not dests:
1137             return []
1138
1139         return dests.values()
1140