Fixing LinuxNode error when checking for localhost
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.Design)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.Design)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.Design)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.Design)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.Design)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.Design)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.Design)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.Design)
190
191         gateway_user = Attribute("gatewayUser", "Gateway account username",
192                 flags = Flags.Design)
193
194         gateway = Attribute("gateway", "Hostname of the gateway machine",
195                 flags = Flags.Design)
196
197         cls._register_attribute(hostname)
198         cls._register_attribute(username)
199         cls._register_attribute(port)
200         cls._register_attribute(home)
201         cls._register_attribute(identity)
202         cls._register_attribute(server_key)
203         cls._register_attribute(clean_home)
204         cls._register_attribute(clean_experiment)
205         cls._register_attribute(clean_processes)
206         cls._register_attribute(tear_down)
207         cls._register_attribute(gateway_user)
208         cls._register_attribute(gateway)
209
210     def __init__(self, ec, guid):
211         super(LinuxNode, self).__init__(ec, guid)
212         self._os = None
213         # home directory at Linux host
214         self._home_dir = ""
215
216         # lock to prevent concurrent applications on the same node,
217         # to execute commands at the same time. There are potential
218         # concurrency issues when using SSH to a same host from 
219         # multiple threads. There are also possible operational 
220         # issues, e.g. an application querying the existence 
221         # of a file or folder prior to its creation, and another 
222         # application creating the same file or folder in between.
223         self._node_lock = threading.Lock()
224     
225     def log_message(self, msg):
226         return " guid %d - host %s - %s " % (self.guid, 
227                 self.get("hostname"), msg)
228
229     @property
230     def home_dir(self):
231         home = self.get("home") or ""
232         if not home.startswith("/"):
233            home = os.path.join(self._home_dir, home) 
234         return home
235
236     @property
237     def nepi_home(self):
238         return os.path.join(self.home_dir, ".nepi")
239
240     @property
241     def usr_dir(self):
242         return os.path.join(self.nepi_home, "nepi-usr")
243
244     @property
245     def lib_dir(self):
246         return os.path.join(self.usr_dir, "lib")
247
248     @property
249     def bin_dir(self):
250         return os.path.join(self.usr_dir, "bin")
251
252     @property
253     def src_dir(self):
254         return os.path.join(self.usr_dir, "src")
255
256     @property
257     def share_dir(self):
258         return os.path.join(self.usr_dir, "share")
259
260     @property
261     def exp_dir(self):
262         return os.path.join(self.nepi_home, "nepi-exp")
263
264     @property
265     def exp_home(self):
266         return os.path.join(self.exp_dir, self.ec.exp_id)
267
268     @property
269     def node_home(self):
270         return os.path.join(self.exp_home, "node-%d" % self.guid)
271
272     @property
273     def run_home(self):
274         return os.path.join(self.node_home, self.ec.run_id)
275
276     @property
277     def os(self):
278         if self._os:
279             return self._os
280
281         if not self.localhost and not self.get("username"):
282             msg = "Can't resolve OS, insufficient data "
283             self.error(msg)
284             raise RuntimeError, msg
285
286         out = self.get_os()
287
288         if out.find("Fedora release 8") == 0:
289             self._os = OSType.FEDORA_8
290         elif out.find("Fedora release 12") == 0:
291             self._os = OSType.FEDORA_12
292         elif out.find("Fedora release 14") == 0:
293             self._os = OSType.FEDORA_14
294         elif out.find("Fedora release") == 0:
295             self._os = OSType.FEDORA
296         elif out.find("Debian") == 0: 
297             self._os = OSType.DEBIAN
298         elif out.find("Ubuntu") ==0:
299             self._os = OSType.UBUNTU
300         else:
301             msg = "Unsupported OS"
302             self.error(msg, out)
303             raise RuntimeError, "%s - %s " %( msg, out )
304
305         return self._os
306
307     def get_os(self):
308         # The underlying SSH layer will sometimes return an empty
309         # output (even if the command was executed without errors).
310         # To work arround this, repeat the operation N times or
311         # until the result is not empty string
312         out = ""
313         try:
314             (out, err), proc = self.execute("cat /etc/issue", 
315                     with_lock = True,
316                     blocking = True)
317         except:
318             trace = traceback.format_exc()
319             msg = "Error detecting OS: %s " % trace
320             self.error(msg, out, err)
321         
322         return out
323
324     @property
325     def use_deb(self):
326         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
327
328     @property
329     def use_rpm(self):
330         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
331                 OSType.FEDORA]
332
333     @property
334     def localhost(self):
335         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
336
337     def do_provision(self):
338         # check if host is alive
339         if not self.is_alive():
340             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
341             self.error(msg)
342             raise RuntimeError, msg
343
344         self.find_home()
345
346         if self.get("cleanProcesses"):
347             self.clean_processes()
348
349         if self.get("cleanHome"):
350             self.clean_home()
351  
352         if self.get("cleanExperiment"):
353             self.clean_experiment()
354     
355         # Create shared directory structure and node home directory
356         paths = [self.lib_dir, 
357             self.bin_dir, 
358             self.src_dir, 
359             self.share_dir, 
360             self.node_home]
361
362         self.mkdir(paths)
363
364         super(LinuxNode, self).do_provision()
365
366     def do_deploy(self):
367         if self.state == ResourceState.NEW:
368             self.info("Deploying node")
369             self.do_discover()
370             self.do_provision()
371
372         # Node needs to wait until all associated interfaces are 
373         # ready before it can finalize deployment
374         from nepi.resources.linux.interface import LinuxInterface
375         ifaces = self.get_connected(LinuxInterface.get_rtype())
376         for iface in ifaces:
377             if iface.state < ResourceState.READY:
378                 self.ec.schedule(reschedule_delay, self.deploy)
379                 return 
380
381         super(LinuxNode, self).do_deploy()
382
383     def do_release(self):
384         rms = self.get_connected()
385         for rm in rms:
386             # Node needs to wait until all associated RMs are released
387             # before it can be released
388             if rm.state != ResourceState.RELEASED:
389                 self.ec.schedule(reschedule_delay, self.release)
390                 return 
391
392         tear_down = self.get("tearDown")
393         if tear_down:
394             self.execute(tear_down)
395
396         self.clean_processes()
397
398         super(LinuxNode, self).do_release()
399
400     def valid_connection(self, guid):
401         # TODO: Validate!
402         return True
403
404     def clean_processes(self):
405         self.info("Cleaning up processes")
406
407         if self.localhost:
408             return 
409         
410         if self.get("username") != 'root':
411             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
412                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
413         else:
414             if self.state >= ResourceState.READY:
415                 import pickle
416                 pids = pickle.load(open("/tmp/save.proc", "rb"))
417                 pids_temp = dict()
418                 ps_aux = "ps aux |awk '{print $2,$11}'"
419                 (out, err), proc = self.execute(ps_aux)
420                 if len(out) != 0:
421                     for line in out.strip().split("\n"):
422                         parts = line.strip().split(" ")
423                         pids_temp[parts[0]] = parts[1]
424                     kill_pids = set(pids_temp.items()) - set(pids.items())
425                     kill_pids = ' '.join(dict(kill_pids).keys())
426
427                     cmd = ("killall tcpdump || /bin/true ; " +
428                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
429                         "kill %s || /bin/true ; " % kill_pids)
430                 else:
431                     cmd = ("killall tcpdump || /bin/true ; " +
432                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
433             else:
434                 cmd = ("killall tcpdump || /bin/true ; " +
435                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
436
437         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
438
439     def clean_home(self):
440         """ Cleans all NEPI related folders in the Linux host
441         """
442         self.info("Cleaning up home")
443         
444         cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
445                 self.home_dir )
446
447         return self.execute(cmd, with_lock = True)
448
449     def clean_experiment(self):
450         """ Cleans all experiment related files in the Linux host.
451         It preserves NEPI files and folders that have a multi experiment
452         scope.
453         """
454         self.info("Cleaning up experiment files")
455         
456         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
457                 self.exp_dir,
458                 self.ec.exp_id )
459             
460         return self.execute(cmd, with_lock = True)
461
462     def execute(self, command,
463             sudo = False,
464             env = None,
465             tty = False,
466             forward_x11 = False,
467             retry = 3,
468             connect_timeout = 30,
469             strict_host_checking = False,
470             persistent = True,
471             blocking = True,
472             with_lock = False
473             ):
474         """ Notice that this invocation will block until the
475         execution finishes. If this is not the desired behavior,
476         use 'run' instead."""
477
478         if self.localhost:
479             (out, err), proc = execfuncs.lexec(command, 
480                     user = self.get("username"), # still problem with localhost
481                     sudo = sudo,
482                     env = env)
483         else:
484             if with_lock:
485                 # If the execute command is blocking, we don't want to keep
486                 # the node lock. This lock is used to avoid race conditions
487                 # when creating the ControlMaster sockets. A more elegant
488                 # solution is needed.
489                 with self._node_lock:
490                     (out, err), proc = sshfuncs.rexec(
491                         command, 
492                         host = self.get("hostname"),
493                         user = self.get("username"),
494                         port = self.get("port"),
495                         gwuser = self.get("gatewayUser"),
496                         gw = self.get("gateway"),
497                         agent = True,
498                         sudo = sudo,
499                         identity = self.get("identity"),
500                         server_key = self.get("serverKey"),
501                         env = env,
502                         tty = tty,
503                         forward_x11 = forward_x11,
504                         retry = retry,
505                         connect_timeout = connect_timeout,
506                         persistent = persistent,
507                         blocking = blocking, 
508                         strict_host_checking = strict_host_checking
509                         )
510             else:
511                 (out, err), proc = sshfuncs.rexec(
512                     command, 
513                     host = self.get("hostname"),
514                     user = self.get("username"),
515                     port = self.get("port"),
516                     gwuser = self.get("gatewayUser"),
517                     gw = self.get("gateway"),
518                     agent = True,
519                     sudo = sudo,
520                     identity = self.get("identity"),
521                     server_key = self.get("serverKey"),
522                     env = env,
523                     tty = tty,
524                     forward_x11 = forward_x11,
525                     retry = retry,
526                     connect_timeout = connect_timeout,
527                     persistent = persistent,
528                     blocking = blocking, 
529                     strict_host_checking = strict_host_checking
530                     )
531
532         return (out, err), proc
533
534     def run(self, command, home,
535             create_home = False,
536             pidfile = 'pidfile',
537             stdin = None, 
538             stdout = 'stdout', 
539             stderr = 'stderr', 
540             sudo = False,
541             tty = False):
542         
543         self.debug("Running command '%s'" % command)
544         
545         if self.localhost:
546             (out, err), proc = execfuncs.lspawn(command, pidfile,
547                     home = home, 
548                     create_home = create_home, 
549                     stdin = stdin or '/dev/null',
550                     stdout = stdout or '/dev/null',
551                     stderr = stderr or '/dev/null',
552                     sudo = sudo) 
553         else:
554             with self._node_lock:
555                 (out, err), proc = sshfuncs.rspawn(
556                     command,
557                     pidfile = pidfile,
558                     home = home,
559                     create_home = create_home,
560                     stdin = stdin or '/dev/null',
561                     stdout = stdout or '/dev/null',
562                     stderr = stderr or '/dev/null',
563                     sudo = sudo,
564                     host = self.get("hostname"),
565                     user = self.get("username"),
566                     port = self.get("port"),
567                     gwuser = self.get("gatewayUser"),
568                     gw = self.get("gateway"),
569                     agent = True,
570                     identity = self.get("identity"),
571                     server_key = self.get("serverKey"),
572                     tty = tty
573                     )
574
575         return (out, err), proc
576
577     def getpid(self, home, pidfile = "pidfile"):
578         if self.localhost:
579             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
580         else:
581             with self._node_lock:
582                 pidtuple = sshfuncs.rgetpid(
583                     os.path.join(home, pidfile),
584                     host = self.get("hostname"),
585                     user = self.get("username"),
586                     port = self.get("port"),
587                     gwuser = self.get("gatewayUser"),
588                     gw = self.get("gateway"),
589                     agent = True,
590                     identity = self.get("identity"),
591                     server_key = self.get("serverKey")
592                     )
593         
594         return pidtuple
595
596     def status(self, pid, ppid):
597         if self.localhost:
598             status = execfuncs.lstatus(pid, ppid)
599         else:
600             with self._node_lock:
601                 status = sshfuncs.rstatus(
602                         pid, ppid,
603                         host = self.get("hostname"),
604                         user = self.get("username"),
605                         port = self.get("port"),
606                         gwuser = self.get("gatewayUser"),
607                         gw = self.get("gateway"),
608                         agent = True,
609                         identity = self.get("identity"),
610                         server_key = self.get("serverKey")
611                         )
612            
613         return status
614     
615     def kill(self, pid, ppid, sudo = False):
616         out = err = ""
617         proc = None
618         status = self.status(pid, ppid)
619
620         if status == sshfuncs.ProcStatus.RUNNING:
621             if self.localhost:
622                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
623             else:
624                 with self._node_lock:
625                     (out, err), proc = sshfuncs.rkill(
626                         pid, ppid,
627                         host = self.get("hostname"),
628                         user = self.get("username"),
629                         port = self.get("port"),
630                         gwuser = self.get("gatewayUser"),
631                         gw = self.get("gateway"),
632                         agent = True,
633                         sudo = sudo,
634                         identity = self.get("identity"),
635                         server_key = self.get("serverKey")
636                         )
637
638         return (out, err), proc
639
640     def copy(self, src, dst):
641         if self.localhost:
642             (out, err), proc = execfuncs.lcopy(src, dst, 
643                     recursive = True)
644         else:
645             with self._node_lock:
646                 (out, err), proc = sshfuncs.rcopy(
647                     src, dst, 
648                     port = self.get("port"),
649                     gwuser = self.get("gatewayUser"),
650                     gw = self.get("gateway"),
651                     identity = self.get("identity"),
652                     server_key = self.get("serverKey"),
653                     recursive = True,
654                     strict_host_checking = False)
655
656         return (out, err), proc
657
658     def upload(self, src, dst, text = False, overwrite = True,
659             raise_on_error = True):
660         """ Copy content to destination
661
662         src  string with the content to copy. Can be:
663             - plain text
664             - a string with the path to a local file
665             - a string with a semi-colon separeted list of local files
666             - a string with a local directory
667
668         dst  string with destination path on the remote host (remote is 
669             always self.host)
670
671         text src is text input, it must be stored into a temp file before 
672         uploading
673         """
674         # If source is a string input 
675         f = None
676         if text and not os.path.isfile(src):
677             # src is text input that should be uploaded as file
678             # create a temporal file with the content to upload
679             f = tempfile.NamedTemporaryFile(delete=False)
680             f.write(src)
681             f.close()
682             src = f.name
683
684         # If dst files should not be overwritten, check that the files do not
685         # exits already
686         if isinstance(src, str):
687             src = map(str.strip, src.split(";"))
688     
689         if overwrite == False:
690             src = self.filter_existing_files(src, dst)
691             if not src:
692                 return ("", ""), None
693
694         if not self.localhost:
695             # Build destination as <user>@<server>:<path>
696             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
697
698         ((out, err), proc) = self.copy(src, dst)
699
700         # clean up temp file
701         if f:
702             os.remove(f.name)
703
704         if err:
705             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
706             self.error(msg, out, err)
707             
708             msg = "%s out: %s err: %s" % (msg, out, err)
709             if raise_on_error:
710                 raise RuntimeError, msg
711
712         return ((out, err), proc)
713
714     def download(self, src, dst, raise_on_error = True):
715         if not self.localhost:
716             # Build destination as <user>@<server>:<path>
717             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
718
719         ((out, err), proc) = self.copy(src, dst)
720
721         if err:
722             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
723             self.error(msg, out, err)
724
725             if raise_on_error:
726                 raise RuntimeError, msg
727
728         return ((out, err), proc)
729
730     def install_packages_command(self, packages):
731         command = ""
732         if self.use_rpm:
733             command = rpmfuncs.install_packages_command(self.os, packages)
734         elif self.use_deb:
735             command = debfuncs.install_packages_command(self.os, packages)
736         else:
737             msg = "Error installing packages ( OS not known ) "
738             self.error(msg, self.os)
739             raise RuntimeError, msg
740
741         return command
742
743     def install_packages(self, packages, home, run_home = None,
744             raise_on_error = True):
745         """ Install packages in the Linux host.
746
747         'home' is the directory to upload the package installation script.
748         'run_home' is the directory from where to execute the script.
749         """
750         command = self.install_packages_command(packages)
751
752         run_home = run_home or home
753
754         (out, err), proc = self.run_and_wait(command, run_home, 
755             shfile = os.path.join(home, "instpkg.sh"),
756             pidfile = "instpkg_pidfile",
757             ecodefile = "instpkg_exitcode",
758             stdout = "instpkg_stdout", 
759             stderr = "instpkg_stderr",
760             overwrite = False,
761             raise_on_error = raise_on_error)
762
763         return (out, err), proc 
764
765     def remove_packages(self, packages, home, run_home = None,
766             raise_on_error = True):
767         """ Uninstall packages from the Linux host.
768
769         'home' is the directory to upload the package un-installation script.
770         'run_home' is the directory from where to execute the script.
771         """
772         if self.use_rpm:
773             command = rpmfuncs.remove_packages_command(self.os, packages)
774         elif self.use_deb:
775             command = debfuncs.remove_packages_command(self.os, packages)
776         else:
777             msg = "Error removing packages ( OS not known ) "
778             self.error(msg)
779             raise RuntimeError, msg
780
781         run_home = run_home or home
782
783         (out, err), proc = self.run_and_wait(command, run_home, 
784             shfile = os.path.join(home, "rmpkg.sh"),
785             pidfile = "rmpkg_pidfile",
786             ecodefile = "rmpkg_exitcode",
787             stdout = "rmpkg_stdout", 
788             stderr = "rmpkg_stderr",
789             overwrite = False,
790             raise_on_error = raise_on_error)
791          
792         return (out, err), proc 
793
794     def mkdir(self, paths, clean = False):
795         """ Paths is either a single remote directory path to create,
796         or a list of directories to create.
797         """
798         if clean:
799             self.rmdir(paths)
800
801         if isinstance(paths, str):
802             paths = [paths]
803
804         cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
805
806         return self.execute(cmd, with_lock = True)
807
808     def rmdir(self, paths):
809         """ Paths is either a single remote directory path to delete,
810         or a list of directories to delete.
811         """
812
813         if isinstance(paths, str):
814             paths = [paths]
815
816         cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
817
818         return self.execute(cmd, with_lock = True)
819         
820     def run_and_wait(self, command, home, 
821             shfile = "cmd.sh",
822             env = None,
823             overwrite = True,
824             pidfile = "pidfile", 
825             ecodefile = "exitcode", 
826             stdin = None, 
827             stdout = "stdout", 
828             stderr = "stderr", 
829             sudo = False,
830             tty = False,
831             raise_on_error = True):
832         """
833         Uploads the 'command' to a bash script in the host.
834         Then runs the script detached in background in the host, and
835         busy-waites until the script finishes executing.
836         """
837
838         if not shfile.startswith("/"):
839             shfile = os.path.join(home, shfile)
840
841         self.upload_command(command, 
842             shfile = shfile, 
843             ecodefile = ecodefile, 
844             env = env,
845             overwrite = overwrite)
846
847         command = "bash %s" % shfile
848         # run command in background in remote host
849         (out, err), proc = self.run(command, home, 
850                 pidfile = pidfile,
851                 stdin = stdin, 
852                 stdout = stdout, 
853                 stderr = stderr, 
854                 sudo = sudo,
855                 tty = tty)
856
857         # check no errors occurred
858         if proc.poll():
859             msg = " Failed to run command '%s' " % command
860             self.error(msg, out, err)
861             if raise_on_error:
862                 raise RuntimeError, msg
863
864         # Wait for pid file to be generated
865         pid, ppid = self.wait_pid(
866                 home = home, 
867                 pidfile = pidfile, 
868                 raise_on_error = raise_on_error)
869
870         # wait until command finishes to execute
871         self.wait_run(pid, ppid)
872       
873         (eout, err), proc = self.check_errors(home,
874             ecodefile = ecodefile,
875             stderr = stderr)
876
877         # Out is what was written in the stderr file
878         if err:
879             msg = " Failed to run command '%s' " % command
880             self.error(msg, eout, err)
881
882             if raise_on_error:
883                 raise RuntimeError, msg
884
885         (out, oerr), proc = self.check_output(home, stdout)
886         
887         return (out, err), proc
888
889     def exitcode(self, home, ecodefile = "exitcode"):
890         """
891         Get the exit code of an application.
892         Returns an integer value with the exit code 
893         """
894         (out, err), proc = self.check_output(home, ecodefile)
895
896         # Succeeded to open file, return exit code in the file
897         if proc.wait() == 0:
898             try:
899                 return int(out.strip())
900             except:
901                 # Error in the content of the file!
902                 return ExitCode.CORRUPTFILE
903
904         # No such file or directory
905         if proc.returncode == 1:
906             return ExitCode.FILENOTFOUND
907         
908         # Other error from 'cat'
909         return ExitCode.ERROR
910
911     def upload_command(self, command, 
912             shfile = "cmd.sh",
913             ecodefile = "exitcode",
914             overwrite = True,
915             env = None):
916         """ Saves the command as a bash script file in the remote host, and
917         forces to save the exit code of the command execution to the ecodefile
918         """
919
920         if not (command.strip().endswith(";") or command.strip().endswith("&")):
921             command += ";"
922       
923         # The exit code of the command will be stored in ecodefile
924         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
925                 'command': command,
926                 'ecodefile': ecodefile,
927                 } 
928
929         # Export environment
930         environ = self.format_environment(env)
931
932         # Add environ to command
933         command = environ + command
934
935         return self.upload(command, shfile, text = True, overwrite = overwrite)
936
937     def format_environment(self, env, inline = False):
938         """ Formats the environment variables for a command to be executed
939         either as an inline command
940         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
941         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
942         """
943         if not env: return ""
944
945         # Remove extra white spaces
946         env = re.sub(r'\s+', ' ', env.strip())
947
948         sep = ";" if inline else "\n"
949         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
950
951     def check_errors(self, home, 
952             ecodefile = "exitcode", 
953             stderr = "stderr"):
954         """ Checks whether errors occurred while running a command.
955         It first checks the exit code for the command, and only if the
956         exit code is an error one it returns the error output.
957
958         """
959         proc = None
960         err = ""
961
962         # get exit code saved in the 'exitcode' file
963         ecode = self.exitcode(home, ecodefile)
964
965         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
966             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
967         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
968             # The process returned an error code or didn't exist. 
969             # Check standard error.
970             (err, eerr), proc = self.check_output(home, stderr)
971
972             # If the stderr file was not found, assume nothing bad happened,
973             # and just ignore the error.
974             # (cat returns 1 for error "No such file or directory")
975             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
976                 err = "" 
977             
978         return ("", err), proc
979  
980     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
981         """ Waits until the pid file for the command is generated, 
982             and returns the pid and ppid of the process """
983         pid = ppid = None
984         delay = 1.0
985
986         for i in xrange(2):
987             pidtuple = self.getpid(home = home, pidfile = pidfile)
988             
989             if pidtuple:
990                 pid, ppid = pidtuple
991                 break
992             else:
993                 time.sleep(delay)
994                 delay = delay * 1.5
995         else:
996             msg = " Failed to get pid for pidfile %s/%s " % (
997                     home, pidfile )
998             self.error(msg)
999             
1000             if raise_on_error:
1001                 raise RuntimeError, msg
1002
1003         return pid, ppid
1004
1005     def wait_run(self, pid, ppid, trial = 0):
1006         """ wait for a remote process to finish execution """
1007         delay = 1.0
1008
1009         while True:
1010             status = self.status(pid, ppid)
1011             
1012             if status is ProcStatus.FINISHED:
1013                 break
1014             elif status is not ProcStatus.RUNNING:
1015                 delay = delay * 1.5
1016                 time.sleep(delay)
1017                 # If it takes more than 20 seconds to start, then
1018                 # asume something went wrong
1019                 if delay > 20:
1020                     break
1021             else:
1022                 # The app is running, just wait...
1023                 time.sleep(0.5)
1024
1025     def check_output(self, home, filename):
1026         """ Retrives content of file """
1027         (out, err), proc = self.execute("cat %s" % 
1028             os.path.join(home, filename), retry = 1, with_lock = True)
1029         return (out, err), proc
1030
1031     def is_alive(self):
1032         """ Checks if host is responsive
1033         """
1034         if self.localhost:
1035             return True
1036
1037         out = err = ""
1038         msg = "Unresponsive host. Wrong answer. "
1039
1040         # The underlying SSH layer will sometimes return an empty
1041         # output (even if the command was executed without errors).
1042         # To work arround this, repeat the operation N times or
1043         # until the result is not empty string
1044         try:
1045             (out, err), proc = self.execute("echo 'ALIVE'",
1046                     blocking = True,
1047                     with_lock = True)
1048     
1049             if out.find("ALIVE") > -1:
1050                 return True
1051         except:
1052             trace = traceback.format_exc()
1053             msg = "Unresponsive host. Error reaching host: %s " % trace
1054
1055         self.error(msg, out, err)
1056         return False
1057
1058     def find_home(self):
1059         """ Retrieves host home directory
1060         """
1061         # The underlying SSH layer will sometimes return an empty
1062         # output (even if the command was executed without errors).
1063         # To work arround this, repeat the operation N times or
1064         # until the result is not empty string
1065         msg = "Impossible to retrieve HOME directory"
1066         try:
1067             (out, err), proc = self.execute("echo ${HOME}",
1068                     blocking = True,
1069                     with_lock = True)
1070     
1071             if out.strip() != "":
1072                 self._home_dir =  out.strip()
1073         except:
1074             trace = traceback.format_exc()
1075             msg = "Impossible to retrieve HOME directory %s" % trace
1076
1077         if not self._home_dir:
1078             self.error(msg)
1079             raise RuntimeError, msg
1080
1081     def filter_existing_files(self, src, dst):
1082         """ Removes files that already exist in the Linux host from src list
1083         """
1084         # construct a dictionary with { dst: src }
1085         dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1086                     if len(src) > 1 else dict({dst: src[0]})
1087
1088         command = []
1089         for d in dests.keys():
1090             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1091
1092         command = ";".join(command)
1093
1094         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1095     
1096         for d in dests.keys():
1097             if out.find(d) > -1:
1098                 del dests[d]
1099
1100         if not dests:
1101             return []
1102
1103         return dests.values()
1104