Linux/Ns-3/Dce cross experiments
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import socket
32 import tempfile
33 import time
34 import threading
35 import traceback
36
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!! 
39
40 class ExitCode:
41     """
42     Error codes that the rexitcode function can return if unable to
43     check the exit code of a spawned process
44     """
45     FILENOTFOUND = -1
46     CORRUPTFILE = -2
47     ERROR = -3
48     OK = 0
49
50 class OSType:
51     """
52     Supported flavors of Linux OS
53     """
54     FEDORA_8 = "f8"
55     FEDORA_12 = "f12"
56     FEDORA_14 = "f14"
57     FEDORA = "fedora"
58     UBUNTU = "ubuntu"
59     DEBIAN = "debian"
60
61 @clsinit_copy
62 class LinuxNode(ResourceManager):
63     """
64     .. class:: Class Args :
65       
66         :param ec: The Experiment controller
67         :type ec: ExperimentController
68         :param guid: guid of the RM
69         :type guid: int
70
71     .. note::
72
73         There are different ways in which commands can be executed using the
74         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
75         'run_and_wait'). 
76         
77         Brief explanation:
78
79             * 'execute' (blocking mode) :  
80
81                      HOW IT WORKS: 'execute', forks a process and run the
82                      command, synchronously, attached to the terminal, in
83                      foreground.
84                      The execute method will block until the command returns
85                      the result on 'out', 'err' (so until it finishes executing).
86   
87                      USAGE: short-lived commands that must be executed attached
88                      to a terminal and in foreground, for which it IS necessary
89                      to block until the command has finished (e.g. if you want
90                      to run 'ls' or 'cat').
91
92             * 'execute' (NON blocking mode - blocking = False) :
93
94                     HOW IT WORKS: Same as before, except that execute method
95                     will return immediately (even if command still running).
96
97                     USAGE: long-lived commands that must be executed attached
98                     to a terminal and in foreground, but for which it is not
99                     necessary to block until the command has finished. (e.g.
100                     start an application using X11 forwarding)
101
102              * 'run' :
103
104                    HOW IT WORKS: Connects to the host ( using SSH if remote)
105                    and launches the command in background, detached from any
106                    terminal (daemonized), and returns. The command continues to
107                    run remotely, but since it is detached from the terminal,
108                    its pipes (stdin, stdout, stderr) can't be redirected to the
109                    console (as normal non detached processes would), and so they
110                    are explicitly redirected to files. The pidfile is created as
111                    part of the process of launching the command. The pidfile
112                    holds the pid and ppid of the process forked in background,
113                    so later on it is possible to check whether the command is still
114                    running.
115
116                     USAGE: long-lived commands that can run detached in background,
117                     for which it is NOT necessary to block (wait) until the command
118                     has finished. (e.g. start an application that is not using X11
119                     forwarding. It can run detached and remotely in background)
120
121              * 'run_and_wait' :
122
123                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
124                     the command has finished execution. It also checks whether
125                     errors occurred during runtime by reading the exitcode file,
126                     which contains the exit code of the command that was run
127                     (checking stderr only is not always reliable since many
128                     commands throw debugging info to stderr and the only way to
129                     automatically know whether an error really happened is to
130                     check the process exit code).
131
132                     Another difference with respect to 'run', is that instead
133                     of directly executing the command as a bash command line,
134                     it uploads the command to a bash script and runs the script.
135                     This allows to use the bash script to debug errors, since
136                     it remains at the remote host and can be run manually to
137                     reproduce the error.
138                   
139                     USAGE: medium-lived commands that can run detached in
140                     background, for which it IS necessary to block (wait) until
141                     the command has finished. (e.g. Package installation,
142                     source compilation, file download, etc)
143
144     """
145     _rtype = "linux::Node"
146     _help = "Controls Linux host machines ( either localhost or a host " \
147             "that can be accessed using a SSH key)"
148     _platform = "linux"
149
150     @classmethod
151     def _register_attributes(cls):
152         hostname = Attribute("hostname", "Hostname of the machine",
153                 flags = Flags.Design)
154
155         username = Attribute("username", "Local account username", 
156                 flags = Flags.Credential)
157
158         port = Attribute("port", "SSH port", flags = Flags.Design)
159         
160         home = Attribute("home",
161                 "Experiment home directory to store all experiment related files",
162                 flags = Flags.Design)
163         
164         identity = Attribute("identity", "SSH identity file",
165                 flags = Flags.Credential)
166         
167         server_key = Attribute("serverKey", "Server public key", 
168                 flags = Flags.Design)
169         
170         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
171                 " from node home folder before starting experiment", 
172                 type = Types.Bool,
173                 default = False,
174                 flags = Flags.Design)
175
176         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
177                 " from a previous same experiment, before the new experiment starts", 
178                 type = Types.Bool,
179                 default = False,
180                 flags = Flags.Design)
181         
182         clean_processes = Attribute("cleanProcesses", 
183                 "Kill all running processes before starting experiment",
184                 type = Types.Bool,
185                 default = False,
186                 flags = Flags.Design)
187         
188         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
189                 "releasing the resource",
190                 flags = Flags.Design)
191
192         gateway_user = Attribute("gatewayUser", "Gateway account username",
193                 flags = Flags.Design)
194
195         gateway = Attribute("gateway", "Hostname of the gateway machine",
196                 flags = Flags.Design)
197
198         ip = Attribute("ip", "Linux host public IP address. "
199                    "Must not be modified by the user unless hostname is 'localhost'",
200                     flags = Flags.Design)
201
202         cls._register_attribute(hostname)
203         cls._register_attribute(username)
204         cls._register_attribute(port)
205         cls._register_attribute(home)
206         cls._register_attribute(identity)
207         cls._register_attribute(server_key)
208         cls._register_attribute(clean_home)
209         cls._register_attribute(clean_experiment)
210         cls._register_attribute(clean_processes)
211         cls._register_attribute(tear_down)
212         cls._register_attribute(gateway_user)
213         cls._register_attribute(gateway)
214         cls._register_attribute(ip)
215
216     def __init__(self, ec, guid):
217         super(LinuxNode, self).__init__(ec, guid)
218         self._os = None
219         # home directory at Linux host
220         self._home_dir = ""
221
222         # lock to prevent concurrent applications on the same node,
223         # to execute commands at the same time. There are potential
224         # concurrency issues when using SSH to a same host from 
225         # multiple threads. There are also possible operational 
226         # issues, e.g. an application querying the existence 
227         # of a file or folder prior to its creation, and another 
228         # application creating the same file or folder in between.
229         self._node_lock = threading.Lock()
230     
231     def log_message(self, msg):
232         return " guid %d - host %s - %s " % (self.guid, 
233                 self.get("hostname"), msg)
234
235     @property
236     def home_dir(self):
237         home = self.get("home") or ""
238         if not home.startswith("/"):
239            home = os.path.join(self._home_dir, home) 
240         return home
241
242     @property
243     def nepi_home(self):
244         return os.path.join(self.home_dir, ".nepi")
245
246     @property
247     def usr_dir(self):
248         return os.path.join(self.nepi_home, "nepi-usr")
249
250     @property
251     def lib_dir(self):
252         return os.path.join(self.usr_dir, "lib")
253
254     @property
255     def bin_dir(self):
256         return os.path.join(self.usr_dir, "bin")
257
258     @property
259     def src_dir(self):
260         return os.path.join(self.usr_dir, "src")
261
262     @property
263     def share_dir(self):
264         return os.path.join(self.usr_dir, "share")
265
266     @property
267     def exp_dir(self):
268         return os.path.join(self.nepi_home, "nepi-exp")
269
270     @property
271     def exp_home(self):
272         return os.path.join(self.exp_dir, self.ec.exp_id)
273
274     @property
275     def node_home(self):
276         return os.path.join(self.exp_home, "node-%d" % self.guid)
277
278     @property
279     def run_home(self):
280         return os.path.join(self.node_home, self.ec.run_id)
281
282     @property
283     def os(self):
284         if self._os:
285             return self._os
286
287         if not self.localhost and not self.get("username"):
288             msg = "Can't resolve OS, insufficient data "
289             self.error(msg)
290             raise RuntimeError, msg
291
292         out = self.get_os()
293
294         if out.find("Fedora release 8") == 0:
295             self._os = OSType.FEDORA_8
296         elif out.find("Fedora release 12") == 0:
297             self._os = OSType.FEDORA_12
298         elif out.find("Fedora release 14") == 0:
299             self._os = OSType.FEDORA_14
300         elif out.find("Fedora release") == 0:
301             self._os = OSType.FEDORA
302         elif out.find("Debian") == 0: 
303             self._os = OSType.DEBIAN
304         elif out.find("Ubuntu") ==0:
305             self._os = OSType.UBUNTU
306         else:
307             msg = "Unsupported OS"
308             self.error(msg, out)
309             raise RuntimeError, "%s - %s " %( msg, out )
310
311         return self._os
312
313     def get_os(self):
314         # The underlying SSH layer will sometimes return an empty
315         # output (even if the command was executed without errors).
316         # To work arround this, repeat the operation N times or
317         # until the result is not empty string
318         out = ""
319         try:
320             (out, err), proc = self.execute("cat /etc/issue", 
321                     with_lock = True,
322                     blocking = True)
323         except:
324             trace = traceback.format_exc()
325             msg = "Error detecting OS: %s " % trace
326             self.error(msg, out, err)
327         
328         return out
329
330     @property
331     def use_deb(self):
332         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
333
334     @property
335     def use_rpm(self):
336         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
337                 OSType.FEDORA]
338
339     @property
340     def localhost(self):
341         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
342
343     def do_provision(self):
344         # check if host is alive
345         if not self.is_alive():
346             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
347             self.error(msg)
348             raise RuntimeError, msg
349
350         self.find_home()
351
352         if self.get("cleanProcesses"):
353             self.clean_processes()
354
355         if self.get("cleanHome"):
356             self.clean_home()
357  
358         if self.get("cleanExperiment"):
359             self.clean_experiment()
360     
361         # Create shared directory structure and node home directory
362         paths = [self.lib_dir, 
363             self.bin_dir, 
364             self.src_dir, 
365             self.share_dir, 
366             self.node_home]
367
368         self.mkdir(paths)
369
370         # Get Public IP address if possible
371         if not self.get("ip"):
372             ip = None
373
374             if self.localhost:
375                 ip = socket.gethostbyname(socket.gethostname())
376             else:
377                 try:
378                     ip = socket.gethostbyname(self.get("hostname"))
379                 except:
380                     msg = "DNS can not resolve hostname %s" % self.get("hostname") 
381                     self.debug(msg)
382
383             self.set("ip", ip)
384
385         super(LinuxNode, self).do_provision()
386
387     def do_deploy(self):
388         if self.state == ResourceState.NEW:
389             self.info("Deploying node")
390             self.do_discover()
391             self.do_provision()
392
393         # Node needs to wait until all associated interfaces are 
394         # ready before it can finalize deployment
395         from nepi.resources.linux.interface import LinuxInterface
396         ifaces = self.get_connected(LinuxInterface.get_rtype())
397         for iface in ifaces:
398             if iface.state < ResourceState.READY:
399                 self.ec.schedule(self.reschedule_delay, self.deploy)
400                 return 
401
402         super(LinuxNode, self).do_deploy()
403
404     def do_release(self):
405         rms = self.get_connected()
406         for rm in rms:
407             # Node needs to wait until all associated RMs are released
408             # before it can be released
409             if rm.state != ResourceState.RELEASED:
410                 self.ec.schedule(self.reschedule_delay, self.release)
411                 return 
412
413         tear_down = self.get("tearDown")
414         if tear_down:
415             self.execute(tear_down)
416
417         self.clean_processes()
418
419         super(LinuxNode, self).do_release()
420
421     def valid_connection(self, guid):
422         # TODO: Validate!
423         return True
424
425     def clean_processes(self):
426         self.info("Cleaning up processes")
427
428         if self.localhost:
429             return 
430         
431         if self.get("username") != 'root':
432             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
433                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
434         else:
435             if self.state >= ResourceState.READY:
436                 import pickle
437                 pids = pickle.load(open("/tmp/save.proc", "rb"))
438                 pids_temp = dict()
439                 ps_aux = "ps aux |awk '{print $2,$11}'"
440                 (out, err), proc = self.execute(ps_aux)
441                 if len(out) != 0:
442                     for line in out.strip().split("\n"):
443                         parts = line.strip().split(" ")
444                         pids_temp[parts[0]] = parts[1]
445                     kill_pids = set(pids_temp.items()) - set(pids.items())
446                     kill_pids = ' '.join(dict(kill_pids).keys())
447
448                     cmd = ("killall tcpdump || /bin/true ; " +
449                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
450                         "kill %s || /bin/true ; " % kill_pids)
451                 else:
452                     cmd = ("killall tcpdump || /bin/true ; " +
453                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
454             else:
455                 cmd = ("killall tcpdump || /bin/true ; " +
456                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
457
458         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
459
460     def clean_home(self):
461         """ Cleans all NEPI related folders in the Linux host
462         """
463         self.info("Cleaning up home")
464         
465         cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
466                 self.home_dir )
467
468         return self.execute(cmd, with_lock = True)
469
470     def clean_experiment(self):
471         """ Cleans all experiment related files in the Linux host.
472         It preserves NEPI files and folders that have a multi experiment
473         scope.
474         """
475         self.info("Cleaning up experiment files")
476         
477         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
478                 self.exp_dir,
479                 self.ec.exp_id )
480             
481         return self.execute(cmd, with_lock = True)
482
483     def execute(self, command,
484             sudo = False,
485             env = None,
486             tty = False,
487             forward_x11 = False,
488             retry = 3,
489             connect_timeout = 30,
490             strict_host_checking = False,
491             persistent = True,
492             blocking = True,
493             with_lock = False
494             ):
495         """ Notice that this invocation will block until the
496         execution finishes. If this is not the desired behavior,
497         use 'run' instead."""
498
499         if self.localhost:
500             (out, err), proc = execfuncs.lexec(command, 
501                     user = self.get("username"), # still problem with localhost
502                     sudo = sudo,
503                     env = env)
504         else:
505             if with_lock:
506                 # If the execute command is blocking, we don't want to keep
507                 # the node lock. This lock is used to avoid race conditions
508                 # when creating the ControlMaster sockets. A more elegant
509                 # solution is needed.
510                 with self._node_lock:
511                     (out, err), proc = sshfuncs.rexec(
512                         command, 
513                         host = self.get("hostname"),
514                         user = self.get("username"),
515                         port = self.get("port"),
516                         gwuser = self.get("gatewayUser"),
517                         gw = self.get("gateway"),
518                         agent = True,
519                         sudo = sudo,
520                         identity = self.get("identity"),
521                         server_key = self.get("serverKey"),
522                         env = env,
523                         tty = tty,
524                         forward_x11 = forward_x11,
525                         retry = retry,
526                         connect_timeout = connect_timeout,
527                         persistent = persistent,
528                         blocking = blocking, 
529                         strict_host_checking = strict_host_checking
530                         )
531             else:
532                 (out, err), proc = sshfuncs.rexec(
533                     command, 
534                     host = self.get("hostname"),
535                     user = self.get("username"),
536                     port = self.get("port"),
537                     gwuser = self.get("gatewayUser"),
538                     gw = self.get("gateway"),
539                     agent = True,
540                     sudo = sudo,
541                     identity = self.get("identity"),
542                     server_key = self.get("serverKey"),
543                     env = env,
544                     tty = tty,
545                     forward_x11 = forward_x11,
546                     retry = retry,
547                     connect_timeout = connect_timeout,
548                     persistent = persistent,
549                     blocking = blocking, 
550                     strict_host_checking = strict_host_checking
551                     )
552
553         return (out, err), proc
554
555     def run(self, command, home,
556             create_home = False,
557             pidfile = 'pidfile',
558             stdin = None, 
559             stdout = 'stdout', 
560             stderr = 'stderr', 
561             sudo = False,
562             tty = False,
563             strict_host_checking = False):
564         
565         self.debug("Running command '%s'" % command)
566         
567         if self.localhost:
568             (out, err), proc = execfuncs.lspawn(command, pidfile,
569                     home = home, 
570                     create_home = create_home, 
571                     stdin = stdin or '/dev/null',
572                     stdout = stdout or '/dev/null',
573                     stderr = stderr or '/dev/null',
574                     sudo = sudo) 
575         else:
576             with self._node_lock:
577                 (out, err), proc = sshfuncs.rspawn(
578                     command,
579                     pidfile = pidfile,
580                     home = home,
581                     create_home = create_home,
582                     stdin = stdin or '/dev/null',
583                     stdout = stdout or '/dev/null',
584                     stderr = stderr or '/dev/null',
585                     sudo = sudo,
586                     host = self.get("hostname"),
587                     user = self.get("username"),
588                     port = self.get("port"),
589                     gwuser = self.get("gatewayUser"),
590                     gw = self.get("gateway"),
591                     agent = True,
592                     identity = self.get("identity"),
593                     server_key = self.get("serverKey"),
594                     tty = tty,
595                     strict_host_checking = strict_host_checking
596                     )
597
598         return (out, err), proc
599
600     def getpid(self, home, pidfile = "pidfile"):
601         if self.localhost:
602             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
603         else:
604             with self._node_lock:
605                 pidtuple = sshfuncs.rgetpid(
606                     os.path.join(home, pidfile),
607                     host = self.get("hostname"),
608                     user = self.get("username"),
609                     port = self.get("port"),
610                     gwuser = self.get("gatewayUser"),
611                     gw = self.get("gateway"),
612                     agent = True,
613                     identity = self.get("identity"),
614                     server_key = self.get("serverKey"),
615                     strict_host_checking = False
616                     )
617         
618         return pidtuple
619
620     def status(self, pid, ppid):
621         if self.localhost:
622             status = execfuncs.lstatus(pid, ppid)
623         else:
624             with self._node_lock:
625                 status = sshfuncs.rstatus(
626                         pid, ppid,
627                         host = self.get("hostname"),
628                         user = self.get("username"),
629                         port = self.get("port"),
630                         gwuser = self.get("gatewayUser"),
631                         gw = self.get("gateway"),
632                         agent = True,
633                         identity = self.get("identity"),
634                         server_key = self.get("serverKey"),
635                         strict_host_checking = False
636                         )
637            
638         return status
639     
640     def kill(self, pid, ppid, sudo = False):
641         out = err = ""
642         proc = None
643         status = self.status(pid, ppid)
644
645         if status == sshfuncs.ProcStatus.RUNNING:
646             if self.localhost:
647                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
648             else:
649                 with self._node_lock:
650                     (out, err), proc = sshfuncs.rkill(
651                         pid, ppid,
652                         host = self.get("hostname"),
653                         user = self.get("username"),
654                         port = self.get("port"),
655                         gwuser = self.get("gatewayUser"),
656                         gw = self.get("gateway"),
657                         agent = True,
658                         sudo = sudo,
659                         identity = self.get("identity"),
660                         server_key = self.get("serverKey"),
661                         strict_host_checking = False
662                         )
663
664         return (out, err), proc
665
666     def copy(self, src, dst):
667         if self.localhost:
668             (out, err), proc = execfuncs.lcopy(src, dst, 
669                     recursive = True)
670         else:
671             with self._node_lock:
672                 (out, err), proc = sshfuncs.rcopy(
673                     src, dst, 
674                     port = self.get("port"),
675                     gwuser = self.get("gatewayUser"),
676                     gw = self.get("gateway"),
677                     identity = self.get("identity"),
678                     server_key = self.get("serverKey"),
679                     recursive = True,
680                     strict_host_checking = False)
681
682         return (out, err), proc
683
684     def upload(self, src, dst, text = False, overwrite = True,
685             raise_on_error = True):
686         """ Copy content to destination
687
688         src  string with the content to copy. Can be:
689             - plain text
690             - a string with the path to a local file
691             - a string with a semi-colon separeted list of local files
692             - a string with a local directory
693
694         dst  string with destination path on the remote host (remote is 
695             always self.host)
696
697         text src is text input, it must be stored into a temp file before 
698         uploading
699         """
700         # If source is a string input 
701         f = None
702         if text and not os.path.isfile(src):
703             # src is text input that should be uploaded as file
704             # create a temporal file with the content to upload
705             f = tempfile.NamedTemporaryFile(delete=False)
706             f.write(src)
707             f.close()
708             src = f.name
709
710         # If dst files should not be overwritten, check that the files do not
711         # exits already
712         if isinstance(src, str):
713             src = map(str.strip, src.split(";"))
714     
715         if overwrite == False:
716             src = self.filter_existing_files(src, dst)
717             if not src:
718                 return ("", ""), None
719
720         if not self.localhost:
721             # Build destination as <user>@<server>:<path>
722             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
723
724         ((out, err), proc) = self.copy(src, dst)
725
726         # clean up temp file
727         if f:
728             os.remove(f.name)
729
730         if err:
731             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
732             self.error(msg, out, err)
733             
734             msg = "%s out: %s err: %s" % (msg, out, err)
735             if raise_on_error:
736                 raise RuntimeError, msg
737
738         return ((out, err), proc)
739
740     def download(self, src, dst, raise_on_error = True):
741         if not self.localhost:
742             # Build destination as <user>@<server>:<path>
743             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
744
745         ((out, err), proc) = self.copy(src, dst)
746
747         if err:
748             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
749             self.error(msg, out, err)
750
751             if raise_on_error:
752                 raise RuntimeError, msg
753
754         return ((out, err), proc)
755
756     def install_packages_command(self, packages):
757         command = ""
758         if self.use_rpm:
759             command = rpmfuncs.install_packages_command(self.os, packages)
760         elif self.use_deb:
761             command = debfuncs.install_packages_command(self.os, packages)
762         else:
763             msg = "Error installing packages ( OS not known ) "
764             self.error(msg, self.os)
765             raise RuntimeError, msg
766
767         return command
768
769     def install_packages(self, packages, home, run_home = None,
770             raise_on_error = True):
771         """ Install packages in the Linux host.
772
773         'home' is the directory to upload the package installation script.
774         'run_home' is the directory from where to execute the script.
775         """
776         command = self.install_packages_command(packages)
777
778         run_home = run_home or home
779
780         (out, err), proc = self.run_and_wait(command, run_home, 
781             shfile = os.path.join(home, "instpkg.sh"),
782             pidfile = "instpkg_pidfile",
783             ecodefile = "instpkg_exitcode",
784             stdout = "instpkg_stdout", 
785             stderr = "instpkg_stderr",
786             overwrite = False,
787             raise_on_error = raise_on_error)
788
789         return (out, err), proc 
790
791     def remove_packages(self, packages, home, run_home = None,
792             raise_on_error = True):
793         """ Uninstall packages from the Linux host.
794
795         'home' is the directory to upload the package un-installation script.
796         'run_home' is the directory from where to execute the script.
797         """
798         if self.use_rpm:
799             command = rpmfuncs.remove_packages_command(self.os, packages)
800         elif self.use_deb:
801             command = debfuncs.remove_packages_command(self.os, packages)
802         else:
803             msg = "Error removing packages ( OS not known ) "
804             self.error(msg)
805             raise RuntimeError, msg
806
807         run_home = run_home or home
808
809         (out, err), proc = self.run_and_wait(command, run_home, 
810             shfile = os.path.join(home, "rmpkg.sh"),
811             pidfile = "rmpkg_pidfile",
812             ecodefile = "rmpkg_exitcode",
813             stdout = "rmpkg_stdout", 
814             stderr = "rmpkg_stderr",
815             overwrite = False,
816             raise_on_error = raise_on_error)
817          
818         return (out, err), proc 
819
820     def mkdir(self, paths, clean = False):
821         """ Paths is either a single remote directory path to create,
822         or a list of directories to create.
823         """
824         if clean:
825             self.rmdir(paths)
826
827         if isinstance(paths, str):
828             paths = [paths]
829
830         cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
831
832         return self.execute(cmd, with_lock = True)
833
834     def rmdir(self, paths):
835         """ Paths is either a single remote directory path to delete,
836         or a list of directories to delete.
837         """
838
839         if isinstance(paths, str):
840             paths = [paths]
841
842         cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
843
844         return self.execute(cmd, with_lock = True)
845         
846     def run_and_wait(self, command, home, 
847             shfile="cmd.sh",
848             env=None,
849             overwrite=True,
850             wait_run=True,
851             pidfile="pidfile", 
852             ecodefile="exitcode", 
853             stdin=None, 
854             stdout="stdout", 
855             stderr="stderr", 
856             sudo=False,
857             tty=False,
858             raise_on_error=True):
859         """
860         Uploads the 'command' to a bash script in the host.
861         Then runs the script detached in background in the host, and
862         busy-waites until the script finishes executing.
863         """
864
865         if not shfile.startswith("/"):
866             shfile = os.path.join(home, shfile)
867
868         self.upload_command(command, 
869             shfile = shfile, 
870             ecodefile = ecodefile, 
871             env = env,
872             overwrite = overwrite)
873
874         command = "bash %s" % shfile
875         # run command in background in remote host
876         (out, err), proc = self.run(command, home, 
877                 pidfile = pidfile,
878                 stdin = stdin, 
879                 stdout = stdout, 
880                 stderr = stderr, 
881                 sudo = sudo,
882                 tty = tty)
883
884         # check no errors occurred
885         if proc.poll():
886             msg = " Failed to run command '%s' " % command
887             self.error(msg, out, err)
888             if raise_on_error:
889                 raise RuntimeError, msg
890
891         # Wait for pid file to be generated
892         pid, ppid = self.wait_pid(
893                 home = home, 
894                 pidfile = pidfile, 
895                 raise_on_error = raise_on_error)
896
897         if wait_run:
898             # wait until command finishes to execute
899             self.wait_run(pid, ppid)
900           
901             (eout, err), proc = self.check_errors(home,
902                 ecodefile = ecodefile,
903                 stderr = stderr)
904
905             # Out is what was written in the stderr file
906             if err:
907                 msg = " Failed to run command '%s' " % command
908                 self.error(msg, eout, err)
909
910                 if raise_on_error:
911                     raise RuntimeError, msg
912
913         (out, oerr), proc = self.check_output(home, stdout)
914         
915         return (out, err), proc
916         
917     def exitcode(self, home, ecodefile = "exitcode"):
918         """
919         Get the exit code of an application.
920         Returns an integer value with the exit code 
921         """
922         (out, err), proc = self.check_output(home, ecodefile)
923
924         # Succeeded to open file, return exit code in the file
925         if proc.wait() == 0:
926             try:
927                 return int(out.strip())
928             except:
929                 # Error in the content of the file!
930                 return ExitCode.CORRUPTFILE
931
932         # No such file or directory
933         if proc.returncode == 1:
934             return ExitCode.FILENOTFOUND
935         
936         # Other error from 'cat'
937         return ExitCode.ERROR
938
939     def upload_command(self, command, 
940             shfile="cmd.sh",
941             ecodefile="exitcode",
942             overwrite=True,
943             env=None):
944         """ Saves the command as a bash script file in the remote host, and
945         forces to save the exit code of the command execution to the ecodefile
946         """
947
948         if not (command.strip().endswith(";") or command.strip().endswith("&")):
949             command += ";"
950       
951         # The exit code of the command will be stored in ecodefile
952         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
953                 'command': command,
954                 'ecodefile': ecodefile,
955                 } 
956
957         # Export environment
958         environ = self.format_environment(env)
959
960         # Add environ to command
961         command = environ + command
962
963         return self.upload(command, shfile, text=True, overwrite=overwrite)
964
965     def format_environment(self, env, inline=False):
966         """ Formats the environment variables for a command to be executed
967         either as an inline command
968         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
969         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
970         """
971         if not env: return ""
972
973         # Remove extra white spaces
974         env = re.sub(r'\s+', ' ', env.strip())
975
976         sep = ";" if inline else "\n"
977         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
978
979     def check_errors(self, home, 
980             ecodefile = "exitcode", 
981             stderr = "stderr"):
982         """ Checks whether errors occurred while running a command.
983         It first checks the exit code for the command, and only if the
984         exit code is an error one it returns the error output.
985
986         """
987         proc = None
988         err = ""
989
990         # get exit code saved in the 'exitcode' file
991         ecode = self.exitcode(home, ecodefile)
992
993         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
994             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
995         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
996             # The process returned an error code or didn't exist. 
997             # Check standard error.
998             (err, eerr), proc = self.check_output(home, stderr)
999
1000             # If the stderr file was not found, assume nothing bad happened,
1001             # and just ignore the error.
1002             # (cat returns 1 for error "No such file or directory")
1003             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
1004                 err = "" 
1005             
1006         return ("", err), proc
1007  
1008     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1009         """ Waits until the pid file for the command is generated, 
1010             and returns the pid and ppid of the process """
1011         pid = ppid = None
1012         delay = 1.0
1013
1014         for i in xrange(2):
1015             pidtuple = self.getpid(home = home, pidfile = pidfile)
1016             
1017             if pidtuple:
1018                 pid, ppid = pidtuple
1019                 break
1020             else:
1021                 time.sleep(delay)
1022                 delay = delay * 1.5
1023         else:
1024             msg = " Failed to get pid for pidfile %s/%s " % (
1025                     home, pidfile )
1026             self.error(msg)
1027             
1028             if raise_on_error:
1029                 raise RuntimeError, msg
1030
1031         return pid, ppid
1032
1033     def wait_run(self, pid, ppid, trial = 0):
1034         """ wait for a remote process to finish execution """
1035         delay = 1.0
1036
1037         while True:
1038             status = self.status(pid, ppid)
1039             
1040             if status is ProcStatus.FINISHED:
1041                 break
1042             elif status is not ProcStatus.RUNNING:
1043                 delay = delay * 1.5
1044                 time.sleep(delay)
1045                 # If it takes more than 20 seconds to start, then
1046                 # asume something went wrong
1047                 if delay > 20:
1048                     break
1049             else:
1050                 # The app is running, just wait...
1051                 time.sleep(0.5)
1052
1053     def check_output(self, home, filename):
1054         """ Retrives content of file """
1055         (out, err), proc = self.execute("cat %s" % 
1056             os.path.join(home, filename), retry = 1, with_lock = True)
1057         return (out, err), proc
1058
1059     def is_alive(self):
1060         """ Checks if host is responsive
1061         """
1062         if self.localhost:
1063             return True
1064
1065         out = err = ""
1066         msg = "Unresponsive host. Wrong answer. "
1067
1068         # The underlying SSH layer will sometimes return an empty
1069         # output (even if the command was executed without errors).
1070         # To work arround this, repeat the operation N times or
1071         # until the result is not empty string
1072         try:
1073             (out, err), proc = self.execute("echo 'ALIVE'",
1074                     blocking = True,
1075                     with_lock = True)
1076     
1077             if out.find("ALIVE") > -1:
1078                 return True
1079         except:
1080             trace = traceback.format_exc()
1081             msg = "Unresponsive host. Error reaching host: %s " % trace
1082
1083         self.error(msg, out, err)
1084         return False
1085
1086     def find_home(self):
1087         """ Retrieves host home directory
1088         """
1089         # The underlying SSH layer will sometimes return an empty
1090         # output (even if the command was executed without errors).
1091         # To work arround this, repeat the operation N times or
1092         # until the result is not empty string
1093         msg = "Impossible to retrieve HOME directory"
1094         try:
1095             (out, err), proc = self.execute("echo ${HOME}",
1096                     blocking = True,
1097                     with_lock = True)
1098     
1099             if out.strip() != "":
1100                 self._home_dir =  out.strip()
1101         except:
1102             trace = traceback.format_exc()
1103             msg = "Impossible to retrieve HOME directory %s" % trace
1104
1105         if not self._home_dir:
1106             self.error(msg)
1107             raise RuntimeError, msg
1108
1109     def filter_existing_files(self, src, dst):
1110         """ Removes files that already exist in the Linux host from src list
1111         """
1112         # construct a dictionary with { dst: src }
1113         dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1114                     if len(src) > 1 else dict({dst: src[0]})
1115
1116         command = []
1117         for d in dests.keys():
1118             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1119
1120         command = ";".join(command)
1121
1122         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1123     
1124         for d in dests.keys():
1125             if out.find(d) > -1:
1126                 del dests[d]
1127
1128         if not dests:
1129             return []
1130
1131         return dests.values()
1132