Bug fixing in cleanExperiment and cleanHome for linux nodes. The last place-holder...
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy, 
21                                      ResourceState)
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33 import traceback
34
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!! 
37
38 class ExitCode:
39     """
40     Error codes that the rexitcode function can return if unable to
41     check the exit code of a spawned process
42     """
43     FILENOTFOUND = -1
44     CORRUPTFILE = -2
45     ERROR = -3
46     OK = 0
47
48 class OSType:
49     """
50     Supported flavors of Linux OS
51     """
52     DEBIAN = 1 
53     UBUNTU = 1 << 1 
54     FEDORA = 1 << 2
55     FEDORA_8 = 1 << 3 | FEDORA 
56     FEDORA_12 = 1 << 4 | FEDORA 
57     FEDORA_14 = 1 << 5 | FEDORA 
58
59 @clsinit_copy
60 class LinuxNode(ResourceManager):
61     """
62     .. class:: Class Args :
63       
64         :param ec: The Experiment controller
65         :type ec: ExperimentController
66         :param guid: guid of the RM
67         :type guid: int
68
69     .. note::
70
71         There are different ways in which commands can be executed using the
72         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
73         'run_and_wait'). 
74         
75         Brief explanation:
76
77             * 'execute' (blocking mode) :  
78
79                      HOW IT WORKS: 'execute', forks a process and run the
80                      command, synchronously, attached to the terminal, in
81                      foreground.
82                      The execute method will block until the command returns
83                      the result on 'out', 'err' (so until it finishes executing).
84   
85                      USAGE: short-lived commands that must be executed attached
86                      to a terminal and in foreground, for which it IS necessary
87                      to block until the command has finished (e.g. if you want
88                      to run 'ls' or 'cat').
89
90             * 'execute' (NON blocking mode - blocking = False) :
91
92                     HOW IT WORKS: Same as before, except that execute method
93                     will return immediately (even if command still running).
94
95                     USAGE: long-lived commands that must be executed attached
96                     to a terminal and in foreground, but for which it is not
97                     necessary to block until the command has finished. (e.g.
98                     start an application using X11 forwarding)
99
100              * 'run' :
101
102                    HOW IT WORKS: Connects to the host ( using SSH if remote)
103                    and launches the command in background, detached from any
104                    terminal (daemonized), and returns. The command continues to
105                    run remotely, but since it is detached from the terminal,
106                    its pipes (stdin, stdout, stderr) can't be redirected to the
107                    console (as normal non detached processes would), and so they
108                    are explicitly redirected to files. The pidfile is created as
109                    part of the process of launching the command. The pidfile
110                    holds the pid and ppid of the process forked in background,
111                    so later on it is possible to check whether the command is still
112                    running.
113
114                     USAGE: long-lived commands that can run detached in background,
115                     for which it is NOT necessary to block (wait) until the command
116                     has finished. (e.g. start an application that is not using X11
117                     forwarding. It can run detached and remotely in background)
118
119              * 'run_and_wait' :
120
121                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122                     the command has finished execution. It also checks whether
123                     errors occurred during runtime by reading the exitcode file,
124                     which contains the exit code of the command that was run
125                     (checking stderr only is not always reliable since many
126                     commands throw debugging info to stderr and the only way to
127                     automatically know whether an error really happened is to
128                     check the process exit code).
129
130                     Another difference with respect to 'run', is that instead
131                     of directly executing the command as a bash command line,
132                     it uploads the command to a bash script and runs the script.
133                     This allows to use the bash script to debug errors, since
134                     it remains at the remote host and can be run manually to
135                     reproduce the error.
136                   
137                     USAGE: medium-lived commands that can run detached in
138                     background, for which it IS necessary to block (wait) until
139                     the command has finished. (e.g. Package installation,
140                     source compilation, file download, etc)
141
142     """
143     _rtype = "linux::Node"
144     _help = "Controls Linux host machines ( either localhost or a host " \
145             "that can be accessed using a SSH key)"
146     _platform = "linux"
147
148     @classmethod
149     def _register_attributes(cls):
150         cls._register_attribute(Attribute(
151             "hostname", "Hostname of the machine",
152             flags = Flags.Design))
153
154         cls._register_attribute(Attribute(
155             "username", "Local account username", 
156             flags = Flags.Credential))
157
158         cls._register_attribute(Attribute(
159             "port", "SSH port",
160             flags = Flags.Design))
161         
162         cls._register_attribute(Attribute(
163             "home",
164             "Experiment home directory to store all experiment related files",
165             flags = Flags.Design))
166         
167         cls._register_attribute(Attribute(
168             "identity", "SSH identity file",
169             flags = Flags.Credential))
170         
171         cls._register_attribute(Attribute(
172             "serverKey", "Server public key", 
173             flags = Flags.Design))
174         
175         cls._register_attribute(Attribute(
176             "cleanHome",
177             "Remove all nepi files and directories "
178             " from node home folder before starting experiment", 
179             type = Types.Bool,
180             default = False,
181             flags = Flags.Design))
182
183         cls._register_attribute(Attribute(
184             "cleanExperiment", "Remove all files and directories " 
185             " from a previous same experiment, before the new experiment starts", 
186             type = Types.Bool,
187             default = False,
188             flags = Flags.Design))
189         
190         cls._register_attribute(Attribute(
191             "cleanProcesses", 
192             "Kill all running processes before starting experiment",
193             type = Types.Bool,
194             default = False,
195             flags = Flags.Design))
196         
197         cls._register_attribute(Attribute(
198             "cleanProcessesAfter", 
199             """Kill all running processes after starting experiment
200             This might be dangerous when using user root""",
201             type = Types.Bool,
202             default = True,
203             flags = Flags.Design))
204         
205         cls._register_attribute(Attribute(
206             "tearDown",
207             "Bash script to be executed before releasing the resource",
208             flags = Flags.Design))
209
210         cls._register_attribute(Attribute(
211             "gatewayUser",
212             "Gateway account username",
213             flags = Flags.Design))
214
215         cls._register_attribute(Attribute(
216             "gateway",
217             "Hostname of the gateway machine",
218             flags = Flags.Design))
219
220         cls._register_attribute(Attribute(
221             "ip",
222             "Linux host public IP address. "
223             "Must not be modified by the user unless hostname is 'localhost'",
224             flags = Flags.Design))
225
226     def __init__(self, ec, guid):
227         super(LinuxNode, self).__init__(ec, guid)
228         self._os = None
229         # home directory at Linux host
230         self._home_dir = ""
231
232         # lock to prevent concurrent applications on the same node,
233         # to execute commands at the same time. There are potential
234         # concurrency issues when using SSH to a same host from 
235         # multiple threads. There are also possible operational 
236         # issues, e.g. an application querying the existence 
237         # of a file or folder prior to its creation, and another 
238         # application creating the same file or folder in between.
239         self._node_lock = threading.Lock()
240         
241     def log_message(self, msg):
242         return " guid {} - host {} - {} "\
243             .format(self.guid, self.get("hostname"), msg)
244
245     @property
246     def home_dir(self):
247         home = self.get("home") or ""
248         if not home.startswith("/"):
249             home = os.path.join(self._home_dir, home) 
250             return home
251
252     @property
253     def nepi_home(self):
254         return os.path.join(self.home_dir, ".nepi")
255
256     @property
257     def usr_dir(self):
258         return os.path.join(self.nepi_home, "nepi-usr")
259
260     @property
261     def lib_dir(self):
262         return os.path.join(self.usr_dir, "lib")
263
264     @property
265     def bin_dir(self):
266         return os.path.join(self.usr_dir, "bin")
267
268     @property
269     def src_dir(self):
270         return os.path.join(self.usr_dir, "src")
271
272     @property
273     def share_dir(self):
274         return os.path.join(self.usr_dir, "share")
275
276     @property
277     def exp_dir(self):
278         return os.path.join(self.nepi_home, "nepi-exp")
279
280     @property
281     def exp_home(self):
282         return os.path.join(self.exp_dir, self.ec.exp_id)
283
284     @property
285     def node_home(self):
286         return os.path.join(self.exp_home, "node-{}".format(self.guid))
287
288     @property
289     def run_home(self):
290         return os.path.join(self.node_home, self.ec.run_id)
291
292     @property
293     def os(self):
294         if self._os:
295             return self._os
296
297         if not self.localhost and not self.get("username"):
298             msg = "Can't resolve OS, insufficient data "
299             self.error(msg)
300             raise RuntimeError, msg
301
302         out = self.get_os()
303
304         if out.find("Debian") == 0: 
305             self._os = OSType.DEBIAN
306         elif out.find("Ubuntu") ==0:
307             self._os = OSType.UBUNTU
308         elif out.find("Fedora release") == 0:
309             self._os = OSType.FEDORA
310             if out.find("Fedora release 8") == 0:
311                 self._os = OSType.FEDORA_8
312             elif out.find("Fedora release 12") == 0:
313                 self._os = OSType.FEDORA_12
314             elif out.find("Fedora release 14") == 0:
315                 self._os = OSType.FEDORA_14
316         else:
317             msg = "Unsupported OS"
318             self.error(msg, out)
319             raise RuntimeError("{} - {} ".format(msg, out))
320
321         return self._os
322
323     def get_os(self):
324         # The underlying SSH layer will sometimes return an empty
325         # output (even if the command was executed without errors).
326         # To work arround this, repeat the operation N times or
327         # until the result is not empty string
328         out = ""
329         try:
330             (out, err), proc = self.execute("cat /etc/issue", 
331                                             with_lock = True,
332                                             blocking = True)
333         except:
334             trace = traceback.format_exc()
335             msg = "Error detecting OS: {} ".format(trace)
336             self.error(msg, out, err)
337     
338         return out
339
340     @property
341     def use_deb(self):
342         return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
343
344     @property
345     def use_rpm(self):
346         return (self.os & OSType.FEDORA)
347
348     @property
349     def localhost(self):
350         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
351
352     def do_provision(self):
353         # check if host is alive
354         if not self.is_alive():
355             msg = "Deploy failed. Unresponsive node {}".format(self.get("hostname"))
356             self.error(msg)
357             raise RuntimeError, msg
358
359         self.find_home()
360
361         if self.get("cleanProcesses"):
362             self.clean_processes()
363
364         if self.get("cleanHome"):
365             self.clean_home()
366             
367         if self.get("cleanExperiment"):
368             self.clean_experiment()
369             
370         # Create shared directory structure and node home directory
371         paths = [self.lib_dir, 
372                  self.bin_dir, 
373                  self.src_dir, 
374                  self.share_dir, 
375                  self.node_home]
376
377         self.mkdir(paths)
378
379         # Get Public IP address if possible
380         if not self.get("ip"):
381             try:
382                 ip = sshfuncs.gethostbyname(self.get("hostname"))
383                 self.set("ip", ip)
384             except:
385                 if self.get("gateway") is None:
386                     msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
387                     self.error(msg)
388
389         super(LinuxNode, self).do_provision()
390
391     def do_deploy(self):
392         if self.state == ResourceState.NEW:
393             self.info("Deploying node")
394             self.do_discover()
395             self.do_provision()
396
397         # Node needs to wait until all associated interfaces are 
398         # ready before it can finalize deployment
399         from nepi.resources.linux.interface import LinuxInterface
400         ifaces = self.get_connected(LinuxInterface.get_rtype())
401         for iface in ifaces:
402             if iface.state < ResourceState.READY:
403                 self.ec.schedule(self.reschedule_delay, self.deploy)
404                 return 
405
406         super(LinuxNode, self).do_deploy()
407
408     def do_release(self):
409         rms = self.get_connected()
410         for rm in rms:
411             # Node needs to wait until all associated RMs are released
412             # before it can be released
413             if rm.state != ResourceState.RELEASED:
414                 self.ec.schedule(self.reschedule_delay, self.release)
415                 return 
416
417         tear_down = self.get("tearDown")
418         if tear_down:
419             self.execute(tear_down)
420
421         if self.get("cleanProcessesAfter"):
422             self.clean_processes()
423
424         super(LinuxNode, self).do_release()
425
426     def valid_connection(self, guid):
427         # TODO: Validate!
428         return True
429
430     def clean_processes(self):
431         self.info("Cleaning up processes")
432
433         if self.localhost:
434             return 
435         
436         if self.get("username") != 'root':
437             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
438                    "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
439                    "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
440         else:
441             if self.state >= ResourceState.READY:
442                 import pickle
443                 pids = pickle.load(open("/tmp/save.proc", "rb"))
444                 pids_temp = dict()
445                 ps_aux = "ps aux |awk '{print $2,$11}'"
446                 (out, err), proc = self.execute(ps_aux)
447                 if len(out) != 0:
448                     for line in out.strip().split("\n"):
449                         parts = line.strip().split(" ")
450                         pids_temp[parts[0]] = parts[1]
451                     kill_pids = set(pids_temp.items()) - set(pids.items())
452                     kill_pids = ' '.join(dict(kill_pids).keys())
453
454                     cmd = ("killall tcpdump || /bin/true ; " +
455                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
456                            "kill {} || /bin/true ; ".format(kill_pids))
457                 else:
458                     cmd = ("killall tcpdump || /bin/true ; " +
459                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
460             else:
461                 cmd = ("killall tcpdump || /bin/true ; " +
462                        "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
463
464         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
465
466     def clean_home(self):
467         """ Cleans all NEPI related folders in the Linux host
468         """
469         self.info("Cleaning up home")
470         
471         cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + "\
472               .format(self.home_dir, self.exp_dir)
473
474         return self.execute(cmd, with_lock = True)
475
476     def clean_experiment(self):
477         """ Cleans all experiment related files in the Linux host.
478         It preserves NEPI files and folders that have a multi experiment
479         scope.
480         """
481         self.info("Cleaning up experiment files")
482         
483         cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {} + "\
484               .format(self.exp_dir, self.ec.exp_id, self.exp_dir)
485         
486         return self.execute(cmd, with_lock = True)
487
488     def execute(self, command,
489                 sudo = False,
490                 env = None,
491                 tty = False,
492                 forward_x11 = False,
493                 retry = 3,
494                 connect_timeout = 30,
495                 strict_host_checking = False,
496                 persistent = True,
497                 blocking = True,
498                 with_lock = False
499     ):
500         """ Notice that this invocation will block until the
501         execution finishes. If this is not the desired behavior,
502         use 'run' instead."""
503
504         if self.localhost:
505             (out, err), proc = execfuncs.lexec(
506                 command, 
507                 user = self.get("username"), # still problem with localhost
508                 sudo = sudo,
509                 env = env)
510         else:
511             if with_lock:
512                 # If the execute command is blocking, we don't want to keep
513                 # the node lock. This lock is used to avoid race conditions
514                 # when creating the ControlMaster sockets. A more elegant
515                 # solution is needed.
516                 with self._node_lock:
517                     (out, err), proc = sshfuncs.rexec(
518                         command, 
519                         host = self.get("hostname"),
520                         user = self.get("username"),
521                         port = self.get("port"),
522                         gwuser = self.get("gatewayUser"),
523                         gw = self.get("gateway"),
524                         agent = True,
525                         sudo = sudo,
526                         identity = self.get("identity"),
527                         server_key = self.get("serverKey"),
528                         env = env,
529                         tty = tty,
530                         forward_x11 = forward_x11,
531                         retry = retry,
532                         connect_timeout = connect_timeout,
533                         persistent = persistent,
534                         blocking = blocking, 
535                         strict_host_checking = strict_host_checking
536                     )
537             else:
538                 (out, err), proc = sshfuncs.rexec(
539                     command, 
540                     host = self.get("hostname"),
541                     user = self.get("username"),
542                     port = self.get("port"),
543                     gwuser = self.get("gatewayUser"),
544                     gw = self.get("gateway"),
545                     agent = True,
546                     sudo = sudo,
547                     identity = self.get("identity"),
548                     server_key = self.get("serverKey"),
549                     env = env,
550                     tty = tty,
551                     forward_x11 = forward_x11,
552                     retry = retry,
553                     connect_timeout = connect_timeout,
554                     persistent = persistent,
555                     blocking = blocking, 
556                     strict_host_checking = strict_host_checking
557                 )
558
559         return (out, err), proc
560
561     def run(self, command, home,
562             create_home = False,
563             pidfile = 'pidfile',
564             stdin = None, 
565             stdout = 'stdout', 
566             stderr = 'stderr', 
567             sudo = False,
568             tty = False,
569             strict_host_checking = False):
570         
571         self.debug("Running command '{}'".format(command))
572         
573         if self.localhost:
574             (out, err), proc = execfuncs.lspawn(
575                 command, pidfile,
576                 home = home, 
577                 create_home = create_home, 
578                 stdin = stdin or '/dev/null',
579                 stdout = stdout or '/dev/null',
580                 stderr = stderr or '/dev/null',
581                 sudo = sudo) 
582         else:
583             with self._node_lock:
584                 (out, err), proc = sshfuncs.rspawn(
585                     command,
586                     pidfile = pidfile,
587                     home = home,
588                     create_home = create_home,
589                     stdin = stdin or '/dev/null',
590                     stdout = stdout or '/dev/null',
591                     stderr = stderr or '/dev/null',
592                     sudo = sudo,
593                     host = self.get("hostname"),
594                     user = self.get("username"),
595                     port = self.get("port"),
596                     gwuser = self.get("gatewayUser"),
597                     gw = self.get("gateway"),
598                     agent = True,
599                     identity = self.get("identity"),
600                     server_key = self.get("serverKey"),
601                     tty = tty,
602                     strict_host_checking = strict_host_checking
603                 )
604
605         return (out, err), proc
606
607     def getpid(self, home, pidfile = "pidfile"):
608         if self.localhost:
609             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
610         else:
611             with self._node_lock:
612                 pidtuple = sshfuncs.rgetpid(
613                     os.path.join(home, pidfile),
614                     host = self.get("hostname"),
615                     user = self.get("username"),
616                     port = self.get("port"),
617                     gwuser = self.get("gatewayUser"),
618                     gw = self.get("gateway"),
619                     agent = True,
620                     identity = self.get("identity"),
621                     server_key = self.get("serverKey"),
622                     strict_host_checking = False
623                 )
624         
625         return pidtuple
626
627     def status(self, pid, ppid):
628         if self.localhost:
629             status = execfuncs.lstatus(pid, ppid)
630         else:
631             with self._node_lock:
632                 status = sshfuncs.rstatus(
633                     pid, ppid,
634                     host = self.get("hostname"),
635                     user = self.get("username"),
636                     port = self.get("port"),
637                     gwuser = self.get("gatewayUser"),
638                     gw = self.get("gateway"),
639                     agent = True,
640                     identity = self.get("identity"),
641                     server_key = self.get("serverKey"),
642                     strict_host_checking = False
643                 )
644            
645         return status
646     
647     def kill(self, pid, ppid, sudo = False):
648         out = err = ""
649         proc = None
650         status = self.status(pid, ppid)
651
652         if status == sshfuncs.ProcStatus.RUNNING:
653             if self.localhost:
654                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
655             else:
656                 with self._node_lock:
657                     (out, err), proc = sshfuncs.rkill(
658                         pid, ppid,
659                         host = self.get("hostname"),
660                         user = self.get("username"),
661                         port = self.get("port"),
662                         gwuser = self.get("gatewayUser"),
663                         gw = self.get("gateway"),
664                         agent = True,
665                         sudo = sudo,
666                         identity = self.get("identity"),
667                         server_key = self.get("serverKey"),
668                         strict_host_checking = False
669                     )
670
671         return (out, err), proc
672
673     def copy(self, src, dst):
674         if self.localhost:
675             (out, err), proc = execfuncs.lcopy(
676                 src, dst, 
677                 recursive = True)
678         else:
679             with self._node_lock:
680                 (out, err), proc = sshfuncs.rcopy(
681                     src, dst, 
682                     port = self.get("port"),
683                     gwuser = self.get("gatewayUser"),
684                     gw = self.get("gateway"),
685                     identity = self.get("identity"),
686                     server_key = self.get("serverKey"),
687                     recursive = True,
688                     strict_host_checking = False)
689
690         return (out, err), proc
691
692     def upload(self, src, dst, text = False, overwrite = True,
693                raise_on_error = True):
694         """ Copy content to destination
695
696         src  string with the content to copy. Can be:
697             - plain text
698             - a string with the path to a local file
699             - a string with a semi-colon separeted list of local files
700             - a string with a local directory
701
702         dst  string with destination path on the remote host (remote is 
703             always self.host)
704
705         text src is text input, it must be stored into a temp file before 
706         uploading
707         """
708         # If source is a string input 
709         f = None
710         if text and not os.path.isfile(src):
711             # src is text input that should be uploaded as file
712             # create a temporal file with the content to upload
713             f = tempfile.NamedTemporaryFile(delete=False)
714             f.write(src)
715             f.close()
716             src = f.name
717
718         # If dst files should not be overwritten, check that the files do not
719         # exits already
720         if isinstance(src, str):
721             src = map(str.strip, src.split(";"))
722     
723         if overwrite == False:
724             src = self.filter_existing_files(src, dst)
725             if not src:
726                 return ("", ""), None
727
728         if not self.localhost:
729             # Build destination as <user>@<server>:<path>
730             dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
731
732         ((out, err), proc) = self.copy(src, dst)
733
734         # clean up temp file
735         if f:
736             os.remove(f.name)
737
738         if err:
739             msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
740             self.error(msg, out, err)
741             
742             msg = "{} out: {} err: {}".format(msg, out, err)
743             if raise_on_error:
744                 raise RuntimeError, msg
745
746         return ((out, err), proc)
747
748     def download(self, src, dst, raise_on_error = True):
749         if not self.localhost:
750             # Build destination as <user>@<server>:<path>
751             src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
752
753         ((out, err), proc) = self.copy(src, dst)
754
755         if err:
756             msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst) 
757             self.error(msg, out, err)
758
759             if raise_on_error:
760                 raise RuntimeError, msg
761
762         return ((out, err), proc)
763
764     def install_packages_command(self, packages):
765         command = ""
766         if self.use_rpm:
767             command = rpmfuncs.install_packages_command(self.os, packages)
768         elif self.use_deb:
769             command = debfuncs.install_packages_command(self.os, packages)
770         else:
771             msg = "Error installing packages ( OS not known ) "
772             self.error(msg, self.os)
773             raise RuntimeError, msg
774
775         return command
776
777     def install_packages(self, packages, home,
778                          run_home = None,
779                          raise_on_error = True):
780         """ Install packages in the Linux host.
781
782         'home' is the directory to upload the package installation script.
783         'run_home' is the directory from where to execute the script.
784         """
785         command = self.install_packages_command(packages)
786
787         run_home = run_home or home
788
789         (out, err), proc = self.run_and_wait(command, run_home, 
790                                              shfile = os.path.join(home, "instpkg.sh"),
791                                              pidfile = "instpkg_pidfile",
792                                              ecodefile = "instpkg_exitcode",
793                                              stdout = "instpkg_stdout", 
794                                              stderr = "instpkg_stderr",
795                                              overwrite = False,
796                                              raise_on_error = raise_on_error)
797
798         return (out, err), proc 
799
800     def remove_packages(self, packages, home, run_home = None,
801                         raise_on_error = True):
802         """ Uninstall packages from the Linux host.
803
804         'home' is the directory to upload the package un-installation script.
805         'run_home' is the directory from where to execute the script.
806         """
807         if self.use_rpm:
808             command = rpmfuncs.remove_packages_command(self.os, packages)
809         elif self.use_deb:
810             command = debfuncs.remove_packages_command(self.os, packages)
811         else:
812             msg = "Error removing packages ( OS not known ) "
813             self.error(msg)
814             raise RuntimeError, msg
815
816         run_home = run_home or home
817
818         (out, err), proc = self.run_and_wait(command, run_home, 
819                                              shfile = os.path.join(home, "rmpkg.sh"),
820                                              pidfile = "rmpkg_pidfile",
821                                              ecodefile = "rmpkg_exitcode",
822                                              stdout = "rmpkg_stdout", 
823                                              stderr = "rmpkg_stderr",
824                                              overwrite = False,
825                                              raise_on_error = raise_on_error)
826         
827         return (out, err), proc 
828
829     def mkdir(self, paths, clean = False):
830         """ Paths is either a single remote directory path to create,
831         or a list of directories to create.
832         """
833         if clean:
834             self.rmdir(paths)
835
836         if isinstance(paths, str):
837             paths = [paths]
838
839         cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
840
841         return self.execute(cmd, with_lock = True)
842
843     def rmdir(self, paths):
844         """ Paths is either a single remote directory path to delete,
845         or a list of directories to delete.
846         """
847
848         if isinstance(paths, str):
849             paths = [paths]
850
851         cmd = " ; ".join(map(lambda path: "rm -rf {}".format(path), paths))
852
853         return self.execute(cmd, with_lock = True)
854     
855     def run_and_wait(self, command, home, 
856                      shfile="cmd.sh",
857                      env=None,
858                      overwrite=True,
859                      wait_run=True,
860                      pidfile="pidfile", 
861                      ecodefile="exitcode", 
862                      stdin=None, 
863                      stdout="stdout", 
864                      stderr="stderr", 
865                      sudo=False,
866                      tty=False,
867                      raise_on_error=True):
868         """
869         Uploads the 'command' to a bash script in the host.
870         Then runs the script detached in background in the host, and
871         busy-waites until the script finishes executing.
872         """
873
874         if not shfile.startswith("/"):
875             shfile = os.path.join(home, shfile)
876
877         self.upload_command(command, 
878                             shfile = shfile, 
879                             ecodefile = ecodefile, 
880                             env = env,
881                             overwrite = overwrite)
882
883         command = "bash {}".format(shfile)
884         # run command in background in remote host
885         (out, err), proc = self.run(command, home, 
886                                     pidfile = pidfile,
887                                     stdin = stdin, 
888                                     stdout = stdout, 
889                                     stderr = stderr, 
890                                     sudo = sudo,
891                                     tty = tty)
892
893         # check no errors occurred
894         if proc.poll():
895             msg = " Failed to run command '{}' ".format(command)
896             self.error(msg, out, err)
897             if raise_on_error:
898                 raise RuntimeError, msg
899
900         # Wait for pid file to be generated
901         pid, ppid = self.wait_pid(
902             home = home, 
903             pidfile = pidfile, 
904             raise_on_error = raise_on_error)
905
906         if wait_run:
907             # wait until command finishes to execute
908             self.wait_run(pid, ppid)
909             
910             (eout, err), proc = self.check_errors(home,
911                                                   ecodefile = ecodefile,
912                                                   stderr = stderr)
913
914             # Out is what was written in the stderr file
915             if err:
916                 msg = " Failed to run command '{}' ".format(command)
917                 self.error(msg, eout, err)
918
919                 if raise_on_error:
920                     raise RuntimeError, msg
921
922         (out, oerr), proc = self.check_output(home, stdout)
923         
924         return (out, err), proc
925         
926     def exitcode(self, home, ecodefile = "exitcode"):
927         """
928         Get the exit code of an application.
929         Returns an integer value with the exit code 
930         """
931         (out, err), proc = self.check_output(home, ecodefile)
932
933         # Succeeded to open file, return exit code in the file
934         if proc.wait() == 0:
935             try:
936                 return int(out.strip())
937             except:
938                 # Error in the content of the file!
939                 return ExitCode.CORRUPTFILE
940
941         # No such file or directory
942         if proc.returncode == 1:
943             return ExitCode.FILENOTFOUND
944         
945         # Other error from 'cat'
946         return ExitCode.ERROR
947
948     def upload_command(self, command, 
949                        shfile="cmd.sh",
950                        ecodefile="exitcode",
951                        overwrite=True,
952                        env=None):
953         """ Saves the command as a bash script file in the remote host, and
954         forces to save the exit code of the command execution to the ecodefile
955         """
956
957         if not (command.strip().endswith(";") or command.strip().endswith("&")):
958             command += ";"
959             
960         # The exit code of the command will be stored in ecodefile
961         command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
962                   .format(command=command, ecodefile=ecodefile)
963
964         # Export environment
965         environ = self.format_environment(env)
966
967         # Add environ to command
968         command = environ + command
969
970         return self.upload(command, shfile, text=True, overwrite=overwrite)
971
972     def format_environment(self, env, inline=False):
973         """ Formats the environment variables for a command to be executed
974         either as an inline command
975         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
976         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
977         """
978         if not env: return ""
979
980         # Remove extra white spaces
981         env = re.sub(r'\s+', ' ', env.strip())
982
983         sep = ";" if inline else "\n"
984         return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep 
985
986     def check_errors(self, home, 
987                      ecodefile = "exitcode", 
988                      stderr = "stderr"):
989         """ Checks whether errors occurred while running a command.
990         It first checks the exit code for the command, and only if the
991         exit code is an error one it returns the error output.
992
993         """
994         proc = None
995         err = ""
996
997         # get exit code saved in the 'exitcode' file
998         ecode = self.exitcode(home, ecodefile)
999
1000         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1001             err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1002         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1003             # The process returned an error code or didn't exist. 
1004             # Check standard error.
1005             (err, eerr), proc = self.check_output(home, stderr)
1006
1007             # If the stderr file was not found, assume nothing bad happened,
1008             # and just ignore the error.
1009             # (cat returns 1 for error "No such file or directory")
1010             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
1011                 err = "" 
1012                 
1013         return ("", err), proc
1014     
1015     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1016         """ Waits until the pid file for the command is generated, 
1017             and returns the pid and ppid of the process """
1018         pid = ppid = None
1019         delay = 1.0
1020
1021         for i in xrange(2):
1022             pidtuple = self.getpid(home = home, pidfile = pidfile)
1023             
1024             if pidtuple:
1025                 pid, ppid = pidtuple
1026                 break
1027             else:
1028                 time.sleep(delay)
1029                 delay = delay * 1.5
1030         else:
1031             msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1032             self.error(msg)
1033     
1034             if raise_on_error:
1035                 raise RuntimeError, msg
1036
1037         return pid, ppid
1038
1039     def wait_run(self, pid, ppid, trial = 0):
1040         """ wait for a remote process to finish execution """
1041         delay = 1.0
1042
1043         while True:
1044             status = self.status(pid, ppid)
1045             
1046             if status is ProcStatus.FINISHED:
1047                 break
1048             elif status is not ProcStatus.RUNNING:
1049                 delay = delay * 1.5
1050                 time.sleep(delay)
1051                 # If it takes more than 20 seconds to start, then
1052                 # asume something went wrong
1053                 if delay > 20:
1054                     break
1055             else:
1056                 # The app is running, just wait...
1057                 time.sleep(0.5)
1058
1059     def check_output(self, home, filename):
1060         """ Retrives content of file """
1061         (out, err), proc = self.execute(
1062             "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1063         return (out, err), proc
1064
1065     def is_alive(self):
1066         """ Checks if host is responsive
1067         """
1068         if self.localhost:
1069             return True
1070
1071         out = err = ""
1072         msg = "Unresponsive host. Wrong answer. "
1073
1074         # The underlying SSH layer will sometimes return an empty
1075         # output (even if the command was executed without errors).
1076         # To work arround this, repeat the operation N times or
1077         # until the result is not empty string
1078         try:
1079             (out, err), proc = self.execute("echo 'ALIVE'",
1080                                             blocking = True,
1081                                             with_lock = True)
1082             
1083             if out.find("ALIVE") > -1:
1084                 return True
1085         except:
1086             trace = traceback.format_exc()
1087             msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1088
1089         self.error(msg, out, err)
1090         return False
1091
1092     def find_home(self):
1093         """ 
1094         Retrieves host home directory
1095         """
1096         # The underlying SSH layer will sometimes return an empty
1097         # output (even if the command was executed without errors).
1098         # To work arround this, repeat the operation N times or
1099         # until the result is not empty string
1100         msg = "Impossible to retrieve HOME directory"
1101         try:
1102             (out, err), proc = self.execute("echo ${HOME}",
1103                                             blocking = True,
1104                                             with_lock = True)
1105             
1106             if out.strip() != "":
1107                 self._home_dir =  out.strip()
1108         except:
1109             trace = traceback.format_exc()
1110             msg = "Impossible to retrieve HOME directory {}".format(trace)
1111
1112         if not self._home_dir:
1113             self.error(msg)
1114             raise RuntimeError, msg
1115
1116     def filter_existing_files(self, src, dst):
1117         """ Removes files that already exist in the Linux host from src list
1118         """
1119         # construct a dictionary with { dst: src }
1120         dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1121                 if len(src) > 1 else {dst: src[0]}
1122
1123         command = []
1124         for d in dests.keys():
1125             command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1126
1127         command = ";".join(command)
1128
1129         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1130         
1131         for d in dests.keys():
1132             if out.find(d) > -1:
1133                 del dests[d]
1134
1135         if not dests:
1136             return []
1137
1138         return dests.values()
1139