Fix #128 - [NS3] Test ns-3 with localhost
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.Design)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.Design)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.Design)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.Design)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.Design)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.Design)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.Design)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.Design)
190
191         gateway_user = Attribute("gatewayUser", "Gateway account username",
192                 flags = Flags.Design)
193
194         gateway = Attribute("gateway", "Hostname of the gateway machine",
195                 flags = Flags.Design)
196
197         cls._register_attribute(hostname)
198         cls._register_attribute(username)
199         cls._register_attribute(port)
200         cls._register_attribute(home)
201         cls._register_attribute(identity)
202         cls._register_attribute(server_key)
203         cls._register_attribute(clean_home)
204         cls._register_attribute(clean_experiment)
205         cls._register_attribute(clean_processes)
206         cls._register_attribute(tear_down)
207         cls._register_attribute(gateway_user)
208         cls._register_attribute(gateway)
209
210     def __init__(self, ec, guid):
211         super(LinuxNode, self).__init__(ec, guid)
212         self._os = None
213         # home directory at Linux host
214         self._home_dir = ""
215
216         # lock to prevent concurrent applications on the same node,
217         # to execute commands at the same time. There are potential
218         # concurrency issues when using SSH to a same host from 
219         # multiple threads. There are also possible operational 
220         # issues, e.g. an application querying the existence 
221         # of a file or folder prior to its creation, and another 
222         # application creating the same file or folder in between.
223         self._node_lock = threading.Lock()
224     
225     def log_message(self, msg):
226         return " guid %d - host %s - %s " % (self.guid, 
227                 self.get("hostname"), msg)
228
229     @property
230     def home_dir(self):
231         home = self.get("home") or ""
232         if not home.startswith("/"):
233            home = os.path.join(self._home_dir, home) 
234         return home
235
236     @property
237     def usr_dir(self):
238         return os.path.join(self.home_dir, "nepi-usr")
239
240     @property
241     def lib_dir(self):
242         return os.path.join(self.usr_dir, "lib")
243
244     @property
245     def bin_dir(self):
246         return os.path.join(self.usr_dir, "bin")
247
248     @property
249     def src_dir(self):
250         return os.path.join(self.usr_dir, "src")
251
252     @property
253     def share_dir(self):
254         return os.path.join(self.usr_dir, "share")
255
256     @property
257     def exp_dir(self):
258         return os.path.join(self.home_dir, "nepi-exp")
259
260     @property
261     def exp_home(self):
262         return os.path.join(self.exp_dir, self.ec.exp_id)
263
264     @property
265     def node_home(self):
266         return os.path.join(self.exp_home, "node-%d" % self.guid)
267
268     @property
269     def run_home(self):
270         return os.path.join(self.node_home, self.ec.run_id)
271
272     @property
273     def os(self):
274         if self._os:
275             return self._os
276
277         if self.get("hostname") not in ["localhost", "127.0.0.1"] and \
278                 not self.get("username"):
279             msg = "Can't resolve OS, insufficient data "
280             self.error(msg)
281             raise RuntimeError, msg
282
283         out = self.get_os()
284
285         if out.find("Fedora release 8") == 0:
286             self._os = OSType.FEDORA_8
287         elif out.find("Fedora release 12") == 0:
288             self._os = OSType.FEDORA_12
289         elif out.find("Fedora release 14") == 0:
290             self._os = OSType.FEDORA_14
291         elif out.find("Fedora release") == 0:
292             self._os = OSType.FEDORA
293         elif out.find("Debian") == 0: 
294             self._os = OSType.DEBIAN
295         elif out.find("Ubuntu") ==0:
296             self._os = OSType.UBUNTU
297         else:
298             msg = "Unsupported OS"
299             self.error(msg, out)
300             raise RuntimeError, "%s - %s " %( msg, out )
301
302         return self._os
303
304     def get_os(self):
305         # The underlying SSH layer will sometimes return an empty
306         # output (even if the command was executed without errors).
307         # To work arround this, repeat the operation N times or
308         # until the result is not empty string
309         out = ""
310         try:
311             (out, err), proc = self.execute("cat /etc/issue", 
312                     with_lock = True,
313                     blocking = True)
314         except:
315             trace = traceback.format_exc()
316             msg = "Error detecting OS: %s " % trace
317             self.error(msg, out, err)
318         
319         return out
320
321     @property
322     def use_deb(self):
323         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
324
325     @property
326     def use_rpm(self):
327         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
328                 OSType.FEDORA]
329
330     @property
331     def localhost(self):
332         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
333
334     def do_provision(self):
335         # check if host is alive
336         if not self.is_alive():
337             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
338             self.error(msg)
339             raise RuntimeError, msg
340
341         self.find_home()
342
343         if self.get("cleanProcesses"):
344             self.clean_processes()
345
346         if self.get("cleanHome"):
347             self.clean_home()
348  
349         if self.get("cleanExperiment"):
350             self.clean_experiment()
351     
352         # Create shared directory structure
353         self.mkdir(self.lib_dir)
354         self.mkdir(self.bin_dir)
355         self.mkdir(self.src_dir)
356         self.mkdir(self.share_dir)
357
358         # Create experiment node home directory
359         self.mkdir(self.node_home)
360
361         super(LinuxNode, self).do_provision()
362
363     def do_deploy(self):
364         if self.state == ResourceState.NEW:
365             self.info("Deploying node")
366             self.do_discover()
367             self.do_provision()
368
369         # Node needs to wait until all associated interfaces are 
370         # ready before it can finalize deployment
371         from nepi.resources.linux.interface import LinuxInterface
372         ifaces = self.get_connected(LinuxInterface.get_rtype())
373         for iface in ifaces:
374             if iface.state < ResourceState.READY:
375                 self.ec.schedule(reschedule_delay, self.deploy)
376                 return 
377
378         super(LinuxNode, self).do_deploy()
379
380     def do_release(self):
381         rms = self.get_connected()
382         for rm in rms:
383             # Node needs to wait until all associated RMs are released
384             # before it can be released
385             if rm.state != ResourceState.RELEASED:
386                 self.ec.schedule(reschedule_delay, self.release)
387                 return 
388
389         tear_down = self.get("tearDown")
390         if tear_down:
391             self.execute(tear_down)
392
393         self.clean_processes()
394
395         super(LinuxNode, self).do_release()
396
397     def valid_connection(self, guid):
398         # TODO: Validate!
399         return True
400
401     def clean_processes(self):
402         self.info("Cleaning up processes")
403  
404         if self.get("hostname") in ["localhost", "127.0.0.2"]:
405             return 
406         
407         if self.get("username") != 'root':
408             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
409                 "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
410                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
411         else:
412             if self.state >= ResourceState.READY:
413                 import pickle
414                 pids = pickle.load(open("/tmp/save.proc", "rb"))
415                 pids_temp = dict()
416                 ps_aux = "ps aux |awk '{print $2,$11}'"
417                 (out, err), proc = self.execute(ps_aux)
418                 for line in out.strip().split("\n"):
419                     parts = line.strip().split(" ")
420                     pids_temp[parts[0]] = parts[1]
421                 kill_pids = set(pids_temp.items()) - set(pids.items())
422                 kill_pids = ' '.join(dict(kill_pids).keys())
423
424                 cmd = ("killall tcpdump || /bin/true ; " +
425                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
426                     "kill %s || /bin/true ; " % kill_pids)
427             else:
428                 cmd = ("killall tcpdump || /bin/true ; " +
429                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
430
431         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
432
433     def clean_home(self):
434         """ Cleans all NEPI related folders in the Linux host
435         """
436         self.info("Cleaning up home")
437         
438         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
439                 self.home_dir )
440
441         return self.execute(cmd, with_lock = True)
442
443     def clean_experiment(self):
444         """ Cleans all experiment related files in the Linux host.
445         It preserves NEPI files and folders that have a multi experiment
446         scope.
447         """
448         self.info("Cleaning up experiment files")
449         
450         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
451                 self.exp_dir,
452                 self.ec.exp_id )
453             
454         return self.execute(cmd, with_lock = True)
455
456     def execute(self, command,
457             sudo = False,
458             env = None,
459             tty = False,
460             forward_x11 = False,
461             retry = 3,
462             connect_timeout = 30,
463             strict_host_checking = False,
464             persistent = True,
465             blocking = True,
466             with_lock = False
467             ):
468         """ Notice that this invocation will block until the
469         execution finishes. If this is not the desired behavior,
470         use 'run' instead."""
471
472         if self.localhost:
473             (out, err), proc = execfuncs.lexec(command, 
474                     user = self.get("username"), # still problem with localhost
475                     sudo = sudo,
476                     env = env)
477         else:
478             if with_lock:
479                 # If the execute command is blocking, we don't want to keep
480                 # the node lock. This lock is used to avoid race conditions
481                 # when creating the ControlMaster sockets. A more elegant
482                 # solution is needed.
483                 with self._node_lock:
484                     (out, err), proc = sshfuncs.rexec(
485                         command, 
486                         host = self.get("hostname"),
487                         user = self.get("username"),
488                         port = self.get("port"),
489                         gwuser = self.get("gatewayUser"),
490                         gw = self.get("gateway"),
491                         agent = True,
492                         sudo = sudo,
493                         identity = self.get("identity"),
494                         server_key = self.get("serverKey"),
495                         env = env,
496                         tty = tty,
497                         forward_x11 = forward_x11,
498                         retry = retry,
499                         connect_timeout = connect_timeout,
500                         persistent = persistent,
501                         blocking = blocking, 
502                         strict_host_checking = strict_host_checking
503                         )
504             else:
505                 (out, err), proc = sshfuncs.rexec(
506                     command, 
507                     host = self.get("hostname"),
508                     user = self.get("username"),
509                     port = self.get("port"),
510                     gwuser = self.get("gatewayUser"),
511                     gw = self.get("gateway"),
512                     agent = True,
513                     sudo = sudo,
514                     identity = self.get("identity"),
515                     server_key = self.get("serverKey"),
516                     env = env,
517                     tty = tty,
518                     forward_x11 = forward_x11,
519                     retry = retry,
520                     connect_timeout = connect_timeout,
521                     persistent = persistent,
522                     blocking = blocking, 
523                     strict_host_checking = strict_host_checking
524                     )
525
526         return (out, err), proc
527
528     def run(self, command, home,
529             create_home = False,
530             pidfile = 'pidfile',
531             stdin = None, 
532             stdout = 'stdout', 
533             stderr = 'stderr', 
534             sudo = False,
535             tty = False):
536         
537         self.debug("Running command '%s'" % command)
538         
539         if self.localhost:
540             (out, err), proc = execfuncs.lspawn(command, pidfile,
541                     home = home, 
542                     create_home = create_home, 
543                     stdin = stdin or '/dev/null',
544                     stdout = stdout or '/dev/null',
545                     stderr = stderr or '/dev/null',
546                     sudo = sudo) 
547         else:
548             with self._node_lock:
549                 (out, err), proc = sshfuncs.rspawn(
550                     command,
551                     pidfile = pidfile,
552                     home = home,
553                     create_home = create_home,
554                     stdin = stdin or '/dev/null',
555                     stdout = stdout or '/dev/null',
556                     stderr = stderr or '/dev/null',
557                     sudo = sudo,
558                     host = self.get("hostname"),
559                     user = self.get("username"),
560                     port = self.get("port"),
561                     gwuser = self.get("gatewayUser"),
562                     gw = self.get("gateway"),
563                     agent = True,
564                     identity = self.get("identity"),
565                     server_key = self.get("serverKey"),
566                     tty = tty
567                     )
568
569         return (out, err), proc
570
571     def getpid(self, home, pidfile = "pidfile"):
572         if self.localhost:
573             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
574         else:
575             with self._node_lock:
576                 pidtuple = sshfuncs.rgetpid(
577                     os.path.join(home, pidfile),
578                     host = self.get("hostname"),
579                     user = self.get("username"),
580                     port = self.get("port"),
581                     gwuser = self.get("gatewayUser"),
582                     gw = self.get("gateway"),
583                     agent = True,
584                     identity = self.get("identity"),
585                     server_key = self.get("serverKey")
586                     )
587         
588         return pidtuple
589
590     def status(self, pid, ppid):
591         if self.localhost:
592             status = execfuncs.lstatus(pid, ppid)
593         else:
594             with self._node_lock:
595                 status = sshfuncs.rstatus(
596                         pid, ppid,
597                         host = self.get("hostname"),
598                         user = self.get("username"),
599                         port = self.get("port"),
600                         gwuser = self.get("gatewayUser"),
601                         gw = self.get("gateway"),
602                         agent = True,
603                         identity = self.get("identity"),
604                         server_key = self.get("serverKey")
605                         )
606            
607         return status
608     
609     def kill(self, pid, ppid, sudo = False):
610         out = err = ""
611         proc = None
612         status = self.status(pid, ppid)
613
614         if status == sshfuncs.ProcStatus.RUNNING:
615             if self.localhost:
616                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
617             else:
618                 with self._node_lock:
619                     (out, err), proc = sshfuncs.rkill(
620                         pid, ppid,
621                         host = self.get("hostname"),
622                         user = self.get("username"),
623                         port = self.get("port"),
624                         gwuser = self.get("gatewayUser"),
625                         gw = self.get("gateway"),
626                         agent = True,
627                         sudo = sudo,
628                         identity = self.get("identity"),
629                         server_key = self.get("serverKey")
630                         )
631
632         return (out, err), proc
633
634     def copy(self, src, dst):
635         if self.localhost:
636             (out, err), proc = execfuncs.lcopy(src, dst, 
637                     recursive = True)
638         else:
639             with self._node_lock:
640                 (out, err), proc = sshfuncs.rcopy(
641                     src, dst, 
642                     port = self.get("port"),
643                     gwuser = self.get("gatewayUser"),
644                     gw = self.get("gateway"),
645                     identity = self.get("identity"),
646                     server_key = self.get("serverKey"),
647                     recursive = True,
648                     strict_host_checking = False)
649
650         return (out, err), proc
651
652     def upload(self, src, dst, text = False, overwrite = True,
653             raise_on_error = True):
654         """ Copy content to destination
655
656         src  string with the content to copy. Can be:
657             - plain text
658             - a string with the path to a local file
659             - a string with a semi-colon separeted list of local files
660             - a string with a local directory
661
662         dst  string with destination path on the remote host (remote is 
663             always self.host)
664
665         text src is text input, it must be stored into a temp file before 
666         uploading
667         """
668         # If source is a string input 
669         f = None
670         if text and not os.path.isfile(src):
671             # src is text input that should be uploaded as file
672             # create a temporal file with the content to upload
673             f = tempfile.NamedTemporaryFile(delete=False)
674             f.write(src)
675             f.close()
676             src = f.name
677
678         # If dst files should not be overwritten, check that the files do not
679         # exits already
680         if isinstance(src, str):
681             src = map(str.strip, src.split(";"))
682     
683         if overwrite == False:
684             src = self.filter_existing_files(src, dst)
685             if not src:
686                 return ("", ""), None
687
688         if not self.localhost:
689             # Build destination as <user>@<server>:<path>
690             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
691
692         ((out, err), proc) = self.copy(src, dst)
693
694         # clean up temp file
695         if f:
696             os.remove(f.name)
697
698         if err:
699             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
700             self.error(msg, out, err)
701
702             if raise_on_error:
703                 raise RuntimeError, msg
704
705         return ((out, err), proc)
706
707     def download(self, src, dst, raise_on_error = True):
708         if not self.localhost:
709             # Build destination as <user>@<server>:<path>
710             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
711
712         ((out, err), proc) = self.copy(src, dst)
713
714         if err:
715             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
716             self.error(msg, out, err)
717
718             if raise_on_error:
719                 raise RuntimeError, msg
720
721         return ((out, err), proc)
722
723     def install_packages_command(self, packages):
724         command = ""
725         if self.use_rpm:
726             command = rpmfuncs.install_packages_command(self.os, packages)
727         elif self.use_deb:
728             command = debfuncs.install_packages_command(self.os, packages)
729         else:
730             msg = "Error installing packages ( OS not known ) "
731             self.error(msg, self.os)
732             raise RuntimeError, msg
733
734         return command
735
736     def install_packages(self, packages, home, run_home = None,
737             raise_on_error = True):
738         """ Install packages in the Linux host.
739
740         'home' is the directory to upload the package installation script.
741         'run_home' is the directory from where to execute the script.
742         """
743         command = self.install_packages_command(packages)
744
745         run_home = run_home or home
746
747         (out, err), proc = self.run_and_wait(command, run_home, 
748             shfile = os.path.join(home, "instpkg.sh"),
749             pidfile = "instpkg_pidfile",
750             ecodefile = "instpkg_exitcode",
751             stdout = "instpkg_stdout", 
752             stderr = "instpkg_stderr",
753             overwrite = False,
754             raise_on_error = raise_on_error)
755
756         return (out, err), proc 
757
758     def remove_packages(self, packages, home, run_home = None,
759             raise_on_error = True):
760         """ Uninstall packages from the Linux host.
761
762         'home' is the directory to upload the package un-installation script.
763         'run_home' is the directory from where to execute the script.
764         """
765         if self.use_rpm:
766             command = rpmfuncs.remove_packages_command(self.os, packages)
767         elif self.use_deb:
768             command = debfuncs.remove_packages_command(self.os, packages)
769         else:
770             msg = "Error removing packages ( OS not known ) "
771             self.error(msg)
772             raise RuntimeError, msg
773
774         run_home = run_home or home
775
776         (out, err), proc = self.run_and_wait(command, run_home, 
777             shfile = os.path.join(home, "rmpkg.sh"),
778             pidfile = "rmpkg_pidfile",
779             ecodefile = "rmpkg_exitcode",
780             stdout = "rmpkg_stdout", 
781             stderr = "rmpkg_stderr",
782             overwrite = False,
783             raise_on_error = raise_on_error)
784          
785         return (out, err), proc 
786
787     def mkdir(self, path, clean = False):
788         if clean:
789             self.rmdir(path)
790
791         return self.execute("mkdir -p %s" % path, with_lock = True)
792
793     def rmdir(self, path):
794         return self.execute("rm -rf %s" % path, with_lock = True)
795         
796     def run_and_wait(self, command, home, 
797             shfile = "cmd.sh",
798             env = None,
799             overwrite = True,
800             pidfile = "pidfile", 
801             ecodefile = "exitcode", 
802             stdin = None, 
803             stdout = "stdout", 
804             stderr = "stderr", 
805             sudo = False,
806             tty = False,
807             raise_on_error = True):
808         """
809         Uploads the 'command' to a bash script in the host.
810         Then runs the script detached in background in the host, and
811         busy-waites until the script finishes executing.
812         """
813
814         if not shfile.startswith("/"):
815             shfile = os.path.join(home, shfile)
816
817         self.upload_command(command, 
818             shfile = shfile, 
819             ecodefile = ecodefile, 
820             env = env,
821             overwrite = overwrite)
822
823         command = "bash %s" % shfile
824         # run command in background in remote host
825         (out, err), proc = self.run(command, home, 
826                 pidfile = pidfile,
827                 stdin = stdin, 
828                 stdout = stdout, 
829                 stderr = stderr, 
830                 sudo = sudo,
831                 tty = tty)
832
833         # check no errors occurred
834         if proc.poll():
835             msg = " Failed to run command '%s' " % command
836             self.error(msg, out, err)
837             if raise_on_error:
838                 raise RuntimeError, msg
839
840         # Wait for pid file to be generated
841         pid, ppid = self.wait_pid(
842                 home = home, 
843                 pidfile = pidfile, 
844                 raise_on_error = raise_on_error)
845
846         # wait until command finishes to execute
847         self.wait_run(pid, ppid)
848       
849         (eout, err), proc = self.check_errors(home,
850             ecodefile = ecodefile,
851             stderr = stderr)
852
853         # Out is what was written in the stderr file
854         if err:
855             msg = " Failed to run command '%s' " % command
856             self.error(msg, eout, err)
857
858             if raise_on_error:
859                 raise RuntimeError, msg
860
861         (out, oerr), proc = self.check_output(home, stdout)
862         
863         return (out, err), proc
864
865     def exitcode(self, home, ecodefile = "exitcode"):
866         """
867         Get the exit code of an application.
868         Returns an integer value with the exit code 
869         """
870         (out, err), proc = self.check_output(home, ecodefile)
871
872         # Succeeded to open file, return exit code in the file
873         if proc.wait() == 0:
874             try:
875                 return int(out.strip())
876             except:
877                 # Error in the content of the file!
878                 return ExitCode.CORRUPTFILE
879
880         # No such file or directory
881         if proc.returncode == 1:
882             return ExitCode.FILENOTFOUND
883         
884         # Other error from 'cat'
885         return ExitCode.ERROR
886
887     def upload_command(self, command, 
888             shfile = "cmd.sh",
889             ecodefile = "exitcode",
890             overwrite = True,
891             env = None):
892         """ Saves the command as a bash script file in the remote host, and
893         forces to save the exit code of the command execution to the ecodefile
894         """
895
896         if not (command.strip().endswith(";") or command.strip().endswith("&")):
897             command += ";"
898       
899         # The exit code of the command will be stored in ecodefile
900         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
901                 'command': command,
902                 'ecodefile': ecodefile,
903                 } 
904
905         # Export environment
906         environ = self.format_environment(env)
907
908         # Add environ to command
909         command = environ + command
910
911         return self.upload(command, shfile, text = True, overwrite = overwrite)
912
913     def format_environment(self, env, inline = False):
914         """ Formats the environment variables for a command to be executed
915         either as an inline command
916         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
917         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
918         """
919         if not env: return ""
920
921         # Remove extra white spaces
922         env = re.sub(r'\s+', ' ', env.strip())
923
924         sep = ";" if inline else "\n"
925         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
926
927     def check_errors(self, home, 
928             ecodefile = "exitcode", 
929             stderr = "stderr"):
930         """ Checks whether errors occurred while running a command.
931         It first checks the exit code for the command, and only if the
932         exit code is an error one it returns the error output.
933
934         """
935         proc = None
936         err = ""
937
938         # get exit code saved in the 'exitcode' file
939         ecode = self.exitcode(home, ecodefile)
940
941         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
942             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
943         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
944             # The process returned an error code or didn't exist. 
945             # Check standard error.
946             (err, eerr), proc = self.check_output(home, stderr)
947
948             # If the stderr file was not found, assume nothing bad happened,
949             # and just ignore the error.
950             # (cat returns 1 for error "No such file or directory")
951             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
952                 err = "" 
953             
954         return ("", err), proc
955  
956     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
957         """ Waits until the pid file for the command is generated, 
958             and returns the pid and ppid of the process """
959         pid = ppid = None
960         delay = 1.0
961
962         for i in xrange(2):
963             pidtuple = self.getpid(home = home, pidfile = pidfile)
964             
965             if pidtuple:
966                 pid, ppid = pidtuple
967                 break
968             else:
969                 time.sleep(delay)
970                 delay = delay * 1.5
971         else:
972             msg = " Failed to get pid for pidfile %s/%s " % (
973                     home, pidfile )
974             self.error(msg)
975             
976             if raise_on_error:
977                 raise RuntimeError, msg
978
979         return pid, ppid
980
981     def wait_run(self, pid, ppid, trial = 0):
982         """ wait for a remote process to finish execution """
983         delay = 1.0
984
985         while True:
986             status = self.status(pid, ppid)
987             
988             if status is ProcStatus.FINISHED:
989                 break
990             elif status is not ProcStatus.RUNNING:
991                 delay = delay * 1.5
992                 time.sleep(delay)
993                 # If it takes more than 20 seconds to start, then
994                 # asume something went wrong
995                 if delay > 20:
996                     break
997             else:
998                 # The app is running, just wait...
999                 time.sleep(0.5)
1000
1001     def check_output(self, home, filename):
1002         """ Retrives content of file """
1003         (out, err), proc = self.execute("cat %s" % 
1004             os.path.join(home, filename), retry = 1, with_lock = True)
1005         return (out, err), proc
1006
1007     def is_alive(self):
1008         """ Checks if host is responsive
1009         """
1010         if self.localhost:
1011             return True
1012
1013         out = err = ""
1014         msg = "Unresponsive host. Wrong answer. "
1015
1016         # The underlying SSH layer will sometimes return an empty
1017         # output (even if the command was executed without errors).
1018         # To work arround this, repeat the operation N times or
1019         # until the result is not empty string
1020         try:
1021             (out, err), proc = self.execute("echo 'ALIVE'",
1022                     blocking = True,
1023                     with_lock = True)
1024     
1025             if out.find("ALIVE") > -1:
1026                 return True
1027         except:
1028             trace = traceback.format_exc()
1029             msg = "Unresponsive host. Error reaching host: %s " % trace
1030
1031         self.error(msg, out, err)
1032         return False
1033
1034     def find_home(self):
1035         """ Retrieves host home directory
1036         """
1037         # The underlying SSH layer will sometimes return an empty
1038         # output (even if the command was executed without errors).
1039         # To work arround this, repeat the operation N times or
1040         # until the result is not empty string
1041         msg = "Impossible to retrieve HOME directory"
1042         try:
1043             (out, err), proc = self.execute("echo ${HOME}",
1044                     blocking = True,
1045                     with_lock = True)
1046     
1047             if out.strip() != "":
1048                 self._home_dir =  out.strip()
1049         except:
1050             trace = traceback.format_exc()
1051             msg = "Impossible to retrieve HOME directory %s" % trace
1052
1053         if not self._home_dir:
1054             self.error(msg)
1055             raise RuntimeError, msg
1056
1057     def filter_existing_files(self, src, dst):
1058         """ Removes files that already exist in the Linux host from src list
1059         """
1060         # construct a dictionary with { dst: src }
1061         dests = dict(map(
1062             lambda s: (os.path.join(dst, os.path.basename(s)), s ), s)) \
1063                     if len(src) > 1 else dict({dst: src[0]})
1064
1065         command = []
1066         for d in dests.keys():
1067             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1068
1069         command = ";".join(command)
1070
1071         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1072     
1073         for d in dests.keys():
1074             if out.find(d) > -1:
1075                 del dests[d]
1076
1077         if not dests:
1078             return []
1079
1080         return dests.values()
1081