Adding sfa support ple using hostname
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.Design)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.Design)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.Design)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.Design)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.Design)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.Design)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.Design)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.Design)
190
191         gateway_user = Attribute("gatewayUser", "Gateway account username",
192                 flags = Flags.Design)
193
194         gateway = Attribute("gateway", "Hostname of the gateway machine",
195                 flags = Flags.Design)
196
197         cls._register_attribute(hostname)
198         cls._register_attribute(username)
199         cls._register_attribute(port)
200         cls._register_attribute(home)
201         cls._register_attribute(identity)
202         cls._register_attribute(server_key)
203         cls._register_attribute(clean_home)
204         cls._register_attribute(clean_experiment)
205         cls._register_attribute(clean_processes)
206         cls._register_attribute(tear_down)
207         cls._register_attribute(gateway_user)
208         cls._register_attribute(gateway)
209
210     def __init__(self, ec, guid):
211         super(LinuxNode, self).__init__(ec, guid)
212         self._os = None
213         # home directory at Linux host
214         self._home_dir = ""
215
216         # lock to prevent concurrent applications on the same node,
217         # to execute commands at the same time. There are potential
218         # concurrency issues when using SSH to a same host from 
219         # multiple threads. There are also possible operational 
220         # issues, e.g. an application querying the existence 
221         # of a file or folder prior to its creation, and another 
222         # application creating the same file or folder in between.
223         self._node_lock = threading.Lock()
224     
225     def log_message(self, msg):
226         return " guid %d - host %s - %s " % (self.guid, 
227                 self.get("hostname"), msg)
228
229     @property
230     def home_dir(self):
231         home = self.get("home") or ""
232         if not home.startswith("/"):
233            home = os.path.join(self._home_dir, home) 
234         return home
235
236     @property
237     def nepi_home(self):
238         return os.path.join(self.home_dir, ".nepi")
239
240     @property
241     def usr_dir(self):
242         return os.path.join(self.nepi_home, "nepi-usr")
243
244     @property
245     def lib_dir(self):
246         return os.path.join(self.usr_dir, "lib")
247
248     @property
249     def bin_dir(self):
250         return os.path.join(self.usr_dir, "bin")
251
252     @property
253     def src_dir(self):
254         return os.path.join(self.usr_dir, "src")
255
256     @property
257     def share_dir(self):
258         return os.path.join(self.usr_dir, "share")
259
260     @property
261     def exp_dir(self):
262         return os.path.join(self.nepi_home, "nepi-exp")
263
264     @property
265     def exp_home(self):
266         return os.path.join(self.exp_dir, self.ec.exp_id)
267
268     @property
269     def node_home(self):
270         return os.path.join(self.exp_home, "node-%d" % self.guid)
271
272     @property
273     def run_home(self):
274         return os.path.join(self.node_home, self.ec.run_id)
275
276     @property
277     def os(self):
278         if self._os:
279             return self._os
280
281         if self.get("hostname") not in ["localhost", "127.0.0.1"] and \
282                 not self.get("username"):
283             msg = "Can't resolve OS, insufficient data "
284             self.error(msg)
285             raise RuntimeError, msg
286
287         out = self.get_os()
288
289         if out.find("Fedora release 8") == 0:
290             self._os = OSType.FEDORA_8
291         elif out.find("Fedora release 12") == 0:
292             self._os = OSType.FEDORA_12
293         elif out.find("Fedora release 14") == 0:
294             self._os = OSType.FEDORA_14
295         elif out.find("Fedora release") == 0:
296             self._os = OSType.FEDORA
297         elif out.find("Debian") == 0: 
298             self._os = OSType.DEBIAN
299         elif out.find("Ubuntu") ==0:
300             self._os = OSType.UBUNTU
301         else:
302             msg = "Unsupported OS"
303             self.error(msg, out)
304             raise RuntimeError, "%s - %s " %( msg, out )
305
306         return self._os
307
308     def get_os(self):
309         # The underlying SSH layer will sometimes return an empty
310         # output (even if the command was executed without errors).
311         # To work arround this, repeat the operation N times or
312         # until the result is not empty string
313         out = ""
314         try:
315             (out, err), proc = self.execute("cat /etc/issue", 
316                     with_lock = True,
317                     blocking = True)
318         except:
319             trace = traceback.format_exc()
320             msg = "Error detecting OS: %s " % trace
321             self.error(msg, out, err)
322         
323         return out
324
325     @property
326     def use_deb(self):
327         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
328
329     @property
330     def use_rpm(self):
331         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
332                 OSType.FEDORA]
333
334     @property
335     def localhost(self):
336         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
337
338     def do_provision(self):
339         # check if host is alive
340         if not self.is_alive():
341             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
342             self.error(msg)
343             raise RuntimeError, msg
344
345         self.find_home()
346
347         if self.get("cleanProcesses"):
348             self.clean_processes()
349
350         if self.get("cleanHome"):
351             self.clean_home()
352  
353         if self.get("cleanExperiment"):
354             self.clean_experiment()
355     
356         # Create shared directory structure and node home directory
357         paths = [self.lib_dir, 
358             self.bin_dir, 
359             self.src_dir, 
360             self.share_dir, 
361             self.node_home]
362
363         self.mkdir(paths)
364
365         super(LinuxNode, self).do_provision()
366
367     def do_deploy(self):
368         if self.state == ResourceState.NEW:
369             self.info("Deploying node")
370             self.do_discover()
371             self.do_provision()
372
373         # Node needs to wait until all associated interfaces are 
374         # ready before it can finalize deployment
375         from nepi.resources.linux.interface import LinuxInterface
376         ifaces = self.get_connected(LinuxInterface.get_rtype())
377         for iface in ifaces:
378             if iface.state < ResourceState.READY:
379                 self.ec.schedule(reschedule_delay, self.deploy)
380                 return 
381
382         super(LinuxNode, self).do_deploy()
383
384     def do_release(self):
385         rms = self.get_connected()
386         for rm in rms:
387             # Node needs to wait until all associated RMs are released
388             # before it can be released
389             if rm.state != ResourceState.RELEASED:
390                 self.ec.schedule(reschedule_delay, self.release)
391                 return 
392
393         tear_down = self.get("tearDown")
394         if tear_down:
395             self.execute(tear_down)
396
397         self.clean_processes()
398
399         super(LinuxNode, self).do_release()
400
401     def valid_connection(self, guid):
402         # TODO: Validate!
403         return True
404
405     def clean_processes(self):
406         self.info("Cleaning up processes")
407  
408         if self.get("hostname") in ["localhost", "127.0.0.2"]:
409             return 
410         
411         if self.get("username") != 'root':
412             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
413                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
414         else:
415             if self.state >= ResourceState.READY:
416                 import pickle
417                 pids = pickle.load(open("/tmp/save.proc", "rb"))
418                 pids_temp = dict()
419                 ps_aux = "ps aux |awk '{print $2,$11}'"
420                 (out, err), proc = self.execute(ps_aux)
421                 for line in out.strip().split("\n"):
422                     parts = line.strip().split(" ")
423                     pids_temp[parts[0]] = parts[1]
424                 kill_pids = set(pids_temp.items()) - set(pids.items())
425                 kill_pids = ' '.join(dict(kill_pids).keys())
426
427                 cmd = ("killall tcpdump || /bin/true ; " +
428                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
429                     "kill %s || /bin/true ; " % kill_pids)
430             else:
431                 cmd = ("killall tcpdump || /bin/true ; " +
432                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
433
434         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
435
436     def clean_home(self):
437         """ Cleans all NEPI related folders in the Linux host
438         """
439         self.info("Cleaning up home")
440         
441         cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
442                 self.home_dir )
443
444         return self.execute(cmd, with_lock = True)
445
446     def clean_experiment(self):
447         """ Cleans all experiment related files in the Linux host.
448         It preserves NEPI files and folders that have a multi experiment
449         scope.
450         """
451         self.info("Cleaning up experiment files")
452         
453         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
454                 self.exp_dir,
455                 self.ec.exp_id )
456             
457         return self.execute(cmd, with_lock = True)
458
459     def execute(self, command,
460             sudo = False,
461             env = None,
462             tty = False,
463             forward_x11 = False,
464             retry = 3,
465             connect_timeout = 30,
466             strict_host_checking = False,
467             persistent = True,
468             blocking = True,
469             with_lock = False
470             ):
471         """ Notice that this invocation will block until the
472         execution finishes. If this is not the desired behavior,
473         use 'run' instead."""
474
475         if self.localhost:
476             (out, err), proc = execfuncs.lexec(command, 
477                     user = self.get("username"), # still problem with localhost
478                     sudo = sudo,
479                     env = env)
480         else:
481             if with_lock:
482                 # If the execute command is blocking, we don't want to keep
483                 # the node lock. This lock is used to avoid race conditions
484                 # when creating the ControlMaster sockets. A more elegant
485                 # solution is needed.
486                 with self._node_lock:
487                     (out, err), proc = sshfuncs.rexec(
488                         command, 
489                         host = self.get("hostname"),
490                         user = self.get("username"),
491                         port = self.get("port"),
492                         gwuser = self.get("gatewayUser"),
493                         gw = self.get("gateway"),
494                         agent = True,
495                         sudo = sudo,
496                         identity = self.get("identity"),
497                         server_key = self.get("serverKey"),
498                         env = env,
499                         tty = tty,
500                         forward_x11 = forward_x11,
501                         retry = retry,
502                         connect_timeout = connect_timeout,
503                         persistent = persistent,
504                         blocking = blocking, 
505                         strict_host_checking = strict_host_checking
506                         )
507             else:
508                 (out, err), proc = sshfuncs.rexec(
509                     command, 
510                     host = self.get("hostname"),
511                     user = self.get("username"),
512                     port = self.get("port"),
513                     gwuser = self.get("gatewayUser"),
514                     gw = self.get("gateway"),
515                     agent = True,
516                     sudo = sudo,
517                     identity = self.get("identity"),
518                     server_key = self.get("serverKey"),
519                     env = env,
520                     tty = tty,
521                     forward_x11 = forward_x11,
522                     retry = retry,
523                     connect_timeout = connect_timeout,
524                     persistent = persistent,
525                     blocking = blocking, 
526                     strict_host_checking = strict_host_checking
527                     )
528
529         return (out, err), proc
530
531     def run(self, command, home,
532             create_home = False,
533             pidfile = 'pidfile',
534             stdin = None, 
535             stdout = 'stdout', 
536             stderr = 'stderr', 
537             sudo = False,
538             tty = False):
539         
540         self.debug("Running command '%s'" % command)
541         
542         if self.localhost:
543             (out, err), proc = execfuncs.lspawn(command, pidfile,
544                     home = home, 
545                     create_home = create_home, 
546                     stdin = stdin or '/dev/null',
547                     stdout = stdout or '/dev/null',
548                     stderr = stderr or '/dev/null',
549                     sudo = sudo) 
550         else:
551             with self._node_lock:
552                 (out, err), proc = sshfuncs.rspawn(
553                     command,
554                     pidfile = pidfile,
555                     home = home,
556                     create_home = create_home,
557                     stdin = stdin or '/dev/null',
558                     stdout = stdout or '/dev/null',
559                     stderr = stderr or '/dev/null',
560                     sudo = sudo,
561                     host = self.get("hostname"),
562                     user = self.get("username"),
563                     port = self.get("port"),
564                     gwuser = self.get("gatewayUser"),
565                     gw = self.get("gateway"),
566                     agent = True,
567                     identity = self.get("identity"),
568                     server_key = self.get("serverKey"),
569                     tty = tty
570                     )
571
572         return (out, err), proc
573
574     def getpid(self, home, pidfile = "pidfile"):
575         if self.localhost:
576             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
577         else:
578             with self._node_lock:
579                 pidtuple = sshfuncs.rgetpid(
580                     os.path.join(home, pidfile),
581                     host = self.get("hostname"),
582                     user = self.get("username"),
583                     port = self.get("port"),
584                     gwuser = self.get("gatewayUser"),
585                     gw = self.get("gateway"),
586                     agent = True,
587                     identity = self.get("identity"),
588                     server_key = self.get("serverKey")
589                     )
590         
591         return pidtuple
592
593     def status(self, pid, ppid):
594         if self.localhost:
595             status = execfuncs.lstatus(pid, ppid)
596         else:
597             with self._node_lock:
598                 status = sshfuncs.rstatus(
599                         pid, ppid,
600                         host = self.get("hostname"),
601                         user = self.get("username"),
602                         port = self.get("port"),
603                         gwuser = self.get("gatewayUser"),
604                         gw = self.get("gateway"),
605                         agent = True,
606                         identity = self.get("identity"),
607                         server_key = self.get("serverKey")
608                         )
609            
610         return status
611     
612     def kill(self, pid, ppid, sudo = False):
613         out = err = ""
614         proc = None
615         status = self.status(pid, ppid)
616
617         if status == sshfuncs.ProcStatus.RUNNING:
618             if self.localhost:
619                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
620             else:
621                 with self._node_lock:
622                     (out, err), proc = sshfuncs.rkill(
623                         pid, ppid,
624                         host = self.get("hostname"),
625                         user = self.get("username"),
626                         port = self.get("port"),
627                         gwuser = self.get("gatewayUser"),
628                         gw = self.get("gateway"),
629                         agent = True,
630                         sudo = sudo,
631                         identity = self.get("identity"),
632                         server_key = self.get("serverKey")
633                         )
634
635         return (out, err), proc
636
637     def copy(self, src, dst):
638         if self.localhost:
639             (out, err), proc = execfuncs.lcopy(src, dst, 
640                     recursive = True)
641         else:
642             with self._node_lock:
643                 (out, err), proc = sshfuncs.rcopy(
644                     src, dst, 
645                     port = self.get("port"),
646                     gwuser = self.get("gatewayUser"),
647                     gw = self.get("gateway"),
648                     identity = self.get("identity"),
649                     server_key = self.get("serverKey"),
650                     recursive = True,
651                     strict_host_checking = False)
652
653         return (out, err), proc
654
655     def upload(self, src, dst, text = False, overwrite = True,
656             raise_on_error = True):
657         """ Copy content to destination
658
659         src  string with the content to copy. Can be:
660             - plain text
661             - a string with the path to a local file
662             - a string with a semi-colon separeted list of local files
663             - a string with a local directory
664
665         dst  string with destination path on the remote host (remote is 
666             always self.host)
667
668         text src is text input, it must be stored into a temp file before 
669         uploading
670         """
671         # If source is a string input 
672         f = None
673         if text and not os.path.isfile(src):
674             # src is text input that should be uploaded as file
675             # create a temporal file with the content to upload
676             f = tempfile.NamedTemporaryFile(delete=False)
677             f.write(src)
678             f.close()
679             src = f.name
680
681         # If dst files should not be overwritten, check that the files do not
682         # exits already
683         if isinstance(src, str):
684             src = map(os.path.expanduser, map(str.strip, src.split(";")))
685     
686         if overwrite == False:
687             src = self.filter_existing_files(src, dst)
688             if not src:
689                 return ("", ""), None
690
691         if not self.localhost:
692             # Build destination as <user>@<server>:<path>
693             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
694
695         ((out, err), proc) = self.copy(src, dst)
696
697         # clean up temp file
698         if f:
699             os.remove(f.name)
700
701         if err:
702             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
703             self.error(msg, out, err)
704
705             if raise_on_error:
706                 raise RuntimeError, msg
707
708         return ((out, err), proc)
709
710     def download(self, src, dst, raise_on_error = True):
711         if not self.localhost:
712             # Build destination as <user>@<server>:<path>
713             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
714
715         ((out, err), proc) = self.copy(src, dst)
716
717         if err:
718             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
719             self.error(msg, out, err)
720
721             if raise_on_error:
722                 raise RuntimeError, msg
723
724         return ((out, err), proc)
725
726     def install_packages_command(self, packages):
727         command = ""
728         if self.use_rpm:
729             command = rpmfuncs.install_packages_command(self.os, packages)
730         elif self.use_deb:
731             command = debfuncs.install_packages_command(self.os, packages)
732         else:
733             msg = "Error installing packages ( OS not known ) "
734             self.error(msg, self.os)
735             raise RuntimeError, msg
736
737         return command
738
739     def install_packages(self, packages, home, run_home = None,
740             raise_on_error = True):
741         """ Install packages in the Linux host.
742
743         'home' is the directory to upload the package installation script.
744         'run_home' is the directory from where to execute the script.
745         """
746         command = self.install_packages_command(packages)
747
748         run_home = run_home or home
749
750         (out, err), proc = self.run_and_wait(command, run_home, 
751             shfile = os.path.join(home, "instpkg.sh"),
752             pidfile = "instpkg_pidfile",
753             ecodefile = "instpkg_exitcode",
754             stdout = "instpkg_stdout", 
755             stderr = "instpkg_stderr",
756             overwrite = False,
757             raise_on_error = raise_on_error)
758
759         return (out, err), proc 
760
761     def remove_packages(self, packages, home, run_home = None,
762             raise_on_error = True):
763         """ Uninstall packages from the Linux host.
764
765         'home' is the directory to upload the package un-installation script.
766         'run_home' is the directory from where to execute the script.
767         """
768         if self.use_rpm:
769             command = rpmfuncs.remove_packages_command(self.os, packages)
770         elif self.use_deb:
771             command = debfuncs.remove_packages_command(self.os, packages)
772         else:
773             msg = "Error removing packages ( OS not known ) "
774             self.error(msg)
775             raise RuntimeError, msg
776
777         run_home = run_home or home
778
779         (out, err), proc = self.run_and_wait(command, run_home, 
780             shfile = os.path.join(home, "rmpkg.sh"),
781             pidfile = "rmpkg_pidfile",
782             ecodefile = "rmpkg_exitcode",
783             stdout = "rmpkg_stdout", 
784             stderr = "rmpkg_stderr",
785             overwrite = False,
786             raise_on_error = raise_on_error)
787          
788         return (out, err), proc 
789
790     def mkdir(self, paths, clean = False):
791         """ Paths is either a single remote directory path to create,
792         or a list of directories to create.
793         """
794         if clean:
795             self.rmdir(paths)
796
797         if isinstance(paths, str):
798             paths = [paths]
799
800         cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
801
802         return self.execute(cmd, with_lock = True)
803
804     def rmdir(self, paths):
805         """ Paths is either a single remote directory path to delete,
806         or a list of directories to delete.
807         """
808
809         if isinstance(paths, str):
810             paths = [paths]
811
812         cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
813
814         return self.execute(cmd, with_lock = True)
815         
816     def run_and_wait(self, command, home, 
817             shfile = "cmd.sh",
818             env = None,
819             overwrite = True,
820             pidfile = "pidfile", 
821             ecodefile = "exitcode", 
822             stdin = None, 
823             stdout = "stdout", 
824             stderr = "stderr", 
825             sudo = False,
826             tty = False,
827             raise_on_error = True):
828         """
829         Uploads the 'command' to a bash script in the host.
830         Then runs the script detached in background in the host, and
831         busy-waites until the script finishes executing.
832         """
833
834         if not shfile.startswith("/"):
835             shfile = os.path.join(home, shfile)
836
837         self.upload_command(command, 
838             shfile = shfile, 
839             ecodefile = ecodefile, 
840             env = env,
841             overwrite = overwrite)
842
843         command = "bash %s" % shfile
844         # run command in background in remote host
845         (out, err), proc = self.run(command, home, 
846                 pidfile = pidfile,
847                 stdin = stdin, 
848                 stdout = stdout, 
849                 stderr = stderr, 
850                 sudo = sudo,
851                 tty = tty)
852
853         # check no errors occurred
854         if proc.poll():
855             msg = " Failed to run command '%s' " % command
856             self.error(msg, out, err)
857             if raise_on_error:
858                 raise RuntimeError, msg
859
860         # Wait for pid file to be generated
861         pid, ppid = self.wait_pid(
862                 home = home, 
863                 pidfile = pidfile, 
864                 raise_on_error = raise_on_error)
865
866         # wait until command finishes to execute
867         self.wait_run(pid, ppid)
868       
869         (eout, err), proc = self.check_errors(home,
870             ecodefile = ecodefile,
871             stderr = stderr)
872
873         # Out is what was written in the stderr file
874         if err:
875             msg = " Failed to run command '%s' " % command
876             self.error(msg, eout, err)
877
878             if raise_on_error:
879                 raise RuntimeError, msg
880
881         (out, oerr), proc = self.check_output(home, stdout)
882         
883         return (out, err), proc
884
885     def exitcode(self, home, ecodefile = "exitcode"):
886         """
887         Get the exit code of an application.
888         Returns an integer value with the exit code 
889         """
890         (out, err), proc = self.check_output(home, ecodefile)
891
892         # Succeeded to open file, return exit code in the file
893         if proc.wait() == 0:
894             try:
895                 return int(out.strip())
896             except:
897                 # Error in the content of the file!
898                 return ExitCode.CORRUPTFILE
899
900         # No such file or directory
901         if proc.returncode == 1:
902             return ExitCode.FILENOTFOUND
903         
904         # Other error from 'cat'
905         return ExitCode.ERROR
906
907     def upload_command(self, command, 
908             shfile = "cmd.sh",
909             ecodefile = "exitcode",
910             overwrite = True,
911             env = None):
912         """ Saves the command as a bash script file in the remote host, and
913         forces to save the exit code of the command execution to the ecodefile
914         """
915
916         if not (command.strip().endswith(";") or command.strip().endswith("&")):
917             command += ";"
918       
919         # The exit code of the command will be stored in ecodefile
920         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
921                 'command': command,
922                 'ecodefile': ecodefile,
923                 } 
924
925         # Export environment
926         environ = self.format_environment(env)
927
928         # Add environ to command
929         command = environ + command
930
931         return self.upload(command, shfile, text = True, overwrite = overwrite)
932
933     def format_environment(self, env, inline = False):
934         """ Formats the environment variables for a command to be executed
935         either as an inline command
936         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
937         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
938         """
939         if not env: return ""
940
941         # Remove extra white spaces
942         env = re.sub(r'\s+', ' ', env.strip())
943
944         sep = ";" if inline else "\n"
945         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
946
947     def check_errors(self, home, 
948             ecodefile = "exitcode", 
949             stderr = "stderr"):
950         """ Checks whether errors occurred while running a command.
951         It first checks the exit code for the command, and only if the
952         exit code is an error one it returns the error output.
953
954         """
955         proc = None
956         err = ""
957
958         # get exit code saved in the 'exitcode' file
959         ecode = self.exitcode(home, ecodefile)
960
961         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
962             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
963         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
964             # The process returned an error code or didn't exist. 
965             # Check standard error.
966             (err, eerr), proc = self.check_output(home, stderr)
967
968             # If the stderr file was not found, assume nothing bad happened,
969             # and just ignore the error.
970             # (cat returns 1 for error "No such file or directory")
971             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
972                 err = "" 
973             
974         return ("", err), proc
975  
976     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
977         """ Waits until the pid file for the command is generated, 
978             and returns the pid and ppid of the process """
979         pid = ppid = None
980         delay = 1.0
981
982         for i in xrange(2):
983             pidtuple = self.getpid(home = home, pidfile = pidfile)
984             
985             if pidtuple:
986                 pid, ppid = pidtuple
987                 break
988             else:
989                 time.sleep(delay)
990                 delay = delay * 1.5
991         else:
992             msg = " Failed to get pid for pidfile %s/%s " % (
993                     home, pidfile )
994             self.error(msg)
995             
996             if raise_on_error:
997                 raise RuntimeError, msg
998
999         return pid, ppid
1000
1001     def wait_run(self, pid, ppid, trial = 0):
1002         """ wait for a remote process to finish execution """
1003         delay = 1.0
1004
1005         while True:
1006             status = self.status(pid, ppid)
1007             
1008             if status is ProcStatus.FINISHED:
1009                 break
1010             elif status is not ProcStatus.RUNNING:
1011                 delay = delay * 1.5
1012                 time.sleep(delay)
1013                 # If it takes more than 20 seconds to start, then
1014                 # asume something went wrong
1015                 if delay > 20:
1016                     break
1017             else:
1018                 # The app is running, just wait...
1019                 time.sleep(0.5)
1020
1021     def check_output(self, home, filename):
1022         """ Retrives content of file """
1023         (out, err), proc = self.execute("cat %s" % 
1024             os.path.join(home, filename), retry = 1, with_lock = True)
1025         return (out, err), proc
1026
1027     def is_alive(self):
1028         """ Checks if host is responsive
1029         """
1030         if self.localhost:
1031             return True
1032
1033         out = err = ""
1034         msg = "Unresponsive host. Wrong answer. "
1035
1036         # The underlying SSH layer will sometimes return an empty
1037         # output (even if the command was executed without errors).
1038         # To work arround this, repeat the operation N times or
1039         # until the result is not empty string
1040         try:
1041             (out, err), proc = self.execute("echo 'ALIVE'",
1042                     blocking = True,
1043                     with_lock = True)
1044     
1045             if out.find("ALIVE") > -1:
1046                 return True
1047         except:
1048             trace = traceback.format_exc()
1049             msg = "Unresponsive host. Error reaching host: %s " % trace
1050
1051         self.error(msg, out, err)
1052         return False
1053
1054     def find_home(self):
1055         """ Retrieves host home directory
1056         """
1057         # The underlying SSH layer will sometimes return an empty
1058         # output (even if the command was executed without errors).
1059         # To work arround this, repeat the operation N times or
1060         # until the result is not empty string
1061         msg = "Impossible to retrieve HOME directory"
1062         try:
1063             (out, err), proc = self.execute("echo ${HOME}",
1064                     blocking = True,
1065                     with_lock = True)
1066     
1067             if out.strip() != "":
1068                 self._home_dir =  out.strip()
1069         except:
1070             trace = traceback.format_exc()
1071             msg = "Impossible to retrieve HOME directory %s" % trace
1072
1073         if not self._home_dir:
1074             self.error(msg)
1075             raise RuntimeError, msg
1076
1077     def filter_existing_files(self, src, dst):
1078         """ Removes files that already exist in the Linux host from src list
1079         """
1080         # construct a dictionary with { dst: src }
1081         dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1082                     if len(src) > 1 else dict({dst: src[0]})
1083
1084         command = []
1085         for d in dests.keys():
1086             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1087
1088         command = ";".join(command)
1089
1090         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1091     
1092         for d in dests.keys():
1093             if out.find(d) > -1:
1094                 del dests[d]
1095
1096         if not dests:
1097             return []
1098
1099         return dests.values()
1100