7873b97a57fa234e9e4e51ee45dbee48cc35ebfe
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import socket
32 import tempfile
33 import time
34 import threading
35 import traceback
36
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!! 
39
40 class ExitCode:
41     """
42     Error codes that the rexitcode function can return if unable to
43     check the exit code of a spawned process
44     """
45     FILENOTFOUND = -1
46     CORRUPTFILE = -2
47     ERROR = -3
48     OK = 0
49
50 class OSType:
51     """
52     Supported flavors of Linux OS
53     """
54     FEDORA_8 = "f8"
55     FEDORA_12 = "f12"
56     FEDORA_14 = "f14"
57     FEDORA = "fedora"
58     UBUNTU = "ubuntu"
59     DEBIAN = "debian"
60
61 @clsinit_copy
62 class LinuxNode(ResourceManager):
63     """
64     .. class:: Class Args :
65       
66         :param ec: The Experiment controller
67         :type ec: ExperimentController
68         :param guid: guid of the RM
69         :type guid: int
70
71     .. note::
72
73         There are different ways in which commands can be executed using the
74         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
75         'run_and_wait'). 
76         
77         Brief explanation:
78
79             * 'execute' (blocking mode) :  
80
81                      HOW IT WORKS: 'execute', forks a process and run the
82                      command, synchronously, attached to the terminal, in
83                      foreground.
84                      The execute method will block until the command returns
85                      the result on 'out', 'err' (so until it finishes executing).
86   
87                      USAGE: short-lived commands that must be executed attached
88                      to a terminal and in foreground, for which it IS necessary
89                      to block until the command has finished (e.g. if you want
90                      to run 'ls' or 'cat').
91
92             * 'execute' (NON blocking mode - blocking = False) :
93
94                     HOW IT WORKS: Same as before, except that execute method
95                     will return immediately (even if command still running).
96
97                     USAGE: long-lived commands that must be executed attached
98                     to a terminal and in foreground, but for which it is not
99                     necessary to block until the command has finished. (e.g.
100                     start an application using X11 forwarding)
101
102              * 'run' :
103
104                    HOW IT WORKS: Connects to the host ( using SSH if remote)
105                    and launches the command in background, detached from any
106                    terminal (daemonized), and returns. The command continues to
107                    run remotely, but since it is detached from the terminal,
108                    its pipes (stdin, stdout, stderr) can't be redirected to the
109                    console (as normal non detached processes would), and so they
110                    are explicitly redirected to files. The pidfile is created as
111                    part of the process of launching the command. The pidfile
112                    holds the pid and ppid of the process forked in background,
113                    so later on it is possible to check whether the command is still
114                    running.
115
116                     USAGE: long-lived commands that can run detached in background,
117                     for which it is NOT necessary to block (wait) until the command
118                     has finished. (e.g. start an application that is not using X11
119                     forwarding. It can run detached and remotely in background)
120
121              * 'run_and_wait' :
122
123                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
124                     the command has finished execution. It also checks whether
125                     errors occurred during runtime by reading the exitcode file,
126                     which contains the exit code of the command that was run
127                     (checking stderr only is not always reliable since many
128                     commands throw debugging info to stderr and the only way to
129                     automatically know whether an error really happened is to
130                     check the process exit code).
131
132                     Another difference with respect to 'run', is that instead
133                     of directly executing the command as a bash command line,
134                     it uploads the command to a bash script and runs the script.
135                     This allows to use the bash script to debug errors, since
136                     it remains at the remote host and can be run manually to
137                     reproduce the error.
138                   
139                     USAGE: medium-lived commands that can run detached in
140                     background, for which it IS necessary to block (wait) until
141                     the command has finished. (e.g. Package installation,
142                     source compilation, file download, etc)
143
144     """
145     _rtype = "LinuxNode"
146     _help = "Controls Linux host machines ( either localhost or a host " \
147             "that can be accessed using a SSH key)"
148     _backend_type = "linux"
149
150     @classmethod
151     def _register_attributes(cls):
152         hostname = Attribute("hostname", "Hostname of the machine",
153                 flags = Flags.Design)
154
155         username = Attribute("username", "Local account username", 
156                 flags = Flags.Credential)
157
158         port = Attribute("port", "SSH port", flags = Flags.Design)
159         
160         home = Attribute("home",
161                 "Experiment home directory to store all experiment related files",
162                 flags = Flags.Design)
163         
164         identity = Attribute("identity", "SSH identity file",
165                 flags = Flags.Credential)
166         
167         server_key = Attribute("serverKey", "Server public key", 
168                 flags = Flags.Design)
169         
170         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
171                 " from node home folder before starting experiment", 
172                 type = Types.Bool,
173                 default = False,
174                 flags = Flags.Design)
175
176         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
177                 " from a previous same experiment, before the new experiment starts", 
178                 type = Types.Bool,
179                 default = False,
180                 flags = Flags.Design)
181         
182         clean_processes = Attribute("cleanProcesses", 
183                 "Kill all running processes before starting experiment",
184                 type = Types.Bool,
185                 default = False,
186                 flags = Flags.Design)
187         
188         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
189                 "releasing the resource",
190                 flags = Flags.Design)
191
192         gateway_user = Attribute("gatewayUser", "Gateway account username",
193                 flags = Flags.Design)
194
195         gateway = Attribute("gateway", "Hostname of the gateway machine",
196                 flags = Flags.Design)
197
198         ip = Attribute("ip", "Linux host public IP address. "
199                    "Must not be modified by the user unless hostname is 'localhost'",
200                     flags = Flags.Design)
201
202         cls._register_attribute(hostname)
203         cls._register_attribute(username)
204         cls._register_attribute(port)
205         cls._register_attribute(home)
206         cls._register_attribute(identity)
207         cls._register_attribute(server_key)
208         cls._register_attribute(clean_home)
209         cls._register_attribute(clean_experiment)
210         cls._register_attribute(clean_processes)
211         cls._register_attribute(tear_down)
212         cls._register_attribute(gateway_user)
213         cls._register_attribute(gateway)
214         cls._register_attribute(ip)
215
216     def __init__(self, ec, guid):
217         super(LinuxNode, self).__init__(ec, guid)
218         self._os = None
219         # home directory at Linux host
220         self._home_dir = ""
221
222         # lock to prevent concurrent applications on the same node,
223         # to execute commands at the same time. There are potential
224         # concurrency issues when using SSH to a same host from 
225         # multiple threads. There are also possible operational 
226         # issues, e.g. an application querying the existence 
227         # of a file or folder prior to its creation, and another 
228         # application creating the same file or folder in between.
229         self._node_lock = threading.Lock()
230     
231     def log_message(self, msg):
232         return " guid %d - host %s - %s " % (self.guid, 
233                 self.get("hostname"), msg)
234
235     @property
236     def home_dir(self):
237         home = self.get("home") or ""
238         if not home.startswith("/"):
239            home = os.path.join(self._home_dir, home) 
240         return home
241
242     @property
243     def nepi_home(self):
244         return os.path.join(self.home_dir, ".nepi")
245
246     @property
247     def usr_dir(self):
248         return os.path.join(self.nepi_home, "nepi-usr")
249
250     @property
251     def lib_dir(self):
252         return os.path.join(self.usr_dir, "lib")
253
254     @property
255     def bin_dir(self):
256         return os.path.join(self.usr_dir, "bin")
257
258     @property
259     def src_dir(self):
260         return os.path.join(self.usr_dir, "src")
261
262     @property
263     def share_dir(self):
264         return os.path.join(self.usr_dir, "share")
265
266     @property
267     def exp_dir(self):
268         return os.path.join(self.nepi_home, "nepi-exp")
269
270     @property
271     def exp_home(self):
272         return os.path.join(self.exp_dir, self.ec.exp_id)
273
274     @property
275     def node_home(self):
276         return os.path.join(self.exp_home, "node-%d" % self.guid)
277
278     @property
279     def run_home(self):
280         return os.path.join(self.node_home, self.ec.run_id)
281
282     @property
283     def os(self):
284         if self._os:
285             return self._os
286
287         if not self.localhost and not self.get("username"):
288             msg = "Can't resolve OS, insufficient data "
289             self.error(msg)
290             raise RuntimeError, msg
291
292         out = self.get_os()
293
294         if out.find("Fedora release 8") == 0:
295             self._os = OSType.FEDORA_8
296         elif out.find("Fedora release 12") == 0:
297             self._os = OSType.FEDORA_12
298         elif out.find("Fedora release 14") == 0:
299             self._os = OSType.FEDORA_14
300         elif out.find("Fedora release") == 0:
301             self._os = OSType.FEDORA
302         elif out.find("Debian") == 0: 
303             self._os = OSType.DEBIAN
304         elif out.find("Ubuntu") ==0:
305             self._os = OSType.UBUNTU
306         else:
307             msg = "Unsupported OS"
308             self.error(msg, out)
309             raise RuntimeError, "%s - %s " %( msg, out )
310
311         return self._os
312
313     def get_os(self):
314         # The underlying SSH layer will sometimes return an empty
315         # output (even if the command was executed without errors).
316         # To work arround this, repeat the operation N times or
317         # until the result is not empty string
318         out = ""
319         try:
320             (out, err), proc = self.execute("cat /etc/issue", 
321                     with_lock = True,
322                     blocking = True)
323         except:
324             trace = traceback.format_exc()
325             msg = "Error detecting OS: %s " % trace
326             self.error(msg, out, err)
327         
328         return out
329
330     @property
331     def use_deb(self):
332         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
333
334     @property
335     def use_rpm(self):
336         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
337                 OSType.FEDORA]
338
339     @property
340     def localhost(self):
341         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
342
343     def do_provision(self):
344         # check if host is alive
345         if not self.is_alive():
346             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
347             self.error(msg)
348             raise RuntimeError, msg
349
350         self.find_home()
351
352         if self.get("cleanProcesses"):
353             self.clean_processes()
354
355         if self.get("cleanHome"):
356             self.clean_home()
357  
358         if self.get("cleanExperiment"):
359             self.clean_experiment()
360     
361         # Create shared directory structure and node home directory
362         paths = [self.lib_dir, 
363             self.bin_dir, 
364             self.src_dir, 
365             self.share_dir, 
366             self.node_home]
367
368         self.mkdir(paths)
369
370         # Get Public IP address if possible
371         if not self.get("ip"):
372             if self.localhost:
373                 try:
374                    ip = socket.gethostbyname(socket.gethostname())
375                 except:
376                     msg = "DNS can not resolve hostname %s" % self.get("hostname") 
377                     self.debug(msg)
378             else:
379                 ip = socket.gethostbyname(self.get("hostname"))
380
381             self.set("ip", ip)
382
383         super(LinuxNode, self).do_provision()
384
385     def do_deploy(self):
386         if self.state == ResourceState.NEW:
387             self.info("Deploying node")
388             self.do_discover()
389             self.do_provision()
390
391         # Node needs to wait until all associated interfaces are 
392         # ready before it can finalize deployment
393         from nepi.resources.linux.interface import LinuxInterface
394         ifaces = self.get_connected(LinuxInterface.get_rtype())
395         for iface in ifaces:
396             if iface.state < ResourceState.READY:
397                 self.ec.schedule(reschedule_delay, self.deploy)
398                 return 
399
400         super(LinuxNode, self).do_deploy()
401
402     def do_release(self):
403         rms = self.get_connected()
404         for rm in rms:
405             # Node needs to wait until all associated RMs are released
406             # before it can be released
407             if rm.state != ResourceState.RELEASED:
408                 self.ec.schedule(reschedule_delay, self.release)
409                 return 
410
411         tear_down = self.get("tearDown")
412         if tear_down:
413             self.execute(tear_down)
414
415         self.clean_processes()
416
417         super(LinuxNode, self).do_release()
418
419     def valid_connection(self, guid):
420         # TODO: Validate!
421         return True
422
423     def clean_processes(self):
424         self.info("Cleaning up processes")
425
426         if self.localhost:
427             return 
428         
429         if self.get("username") != 'root':
430             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
431                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
432         else:
433             if self.state >= ResourceState.READY:
434                 import pickle
435                 pids = pickle.load(open("/tmp/save.proc", "rb"))
436                 pids_temp = dict()
437                 ps_aux = "ps aux |awk '{print $2,$11}'"
438                 (out, err), proc = self.execute(ps_aux)
439                 if len(out) != 0:
440                     for line in out.strip().split("\n"):
441                         parts = line.strip().split(" ")
442                         pids_temp[parts[0]] = parts[1]
443                     kill_pids = set(pids_temp.items()) - set(pids.items())
444                     kill_pids = ' '.join(dict(kill_pids).keys())
445
446                     cmd = ("killall tcpdump || /bin/true ; " +
447                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
448                         "kill %s || /bin/true ; " % kill_pids)
449                 else:
450                     cmd = ("killall tcpdump || /bin/true ; " +
451                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
452             else:
453                 cmd = ("killall tcpdump || /bin/true ; " +
454                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
455
456         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
457
458     def clean_home(self):
459         """ Cleans all NEPI related folders in the Linux host
460         """
461         self.info("Cleaning up home")
462         
463         cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
464                 self.home_dir )
465
466         return self.execute(cmd, with_lock = True)
467
468     def clean_experiment(self):
469         """ Cleans all experiment related files in the Linux host.
470         It preserves NEPI files and folders that have a multi experiment
471         scope.
472         """
473         self.info("Cleaning up experiment files")
474         
475         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
476                 self.exp_dir,
477                 self.ec.exp_id )
478             
479         return self.execute(cmd, with_lock = True)
480
481     def execute(self, command,
482             sudo = False,
483             env = None,
484             tty = False,
485             forward_x11 = False,
486             retry = 3,
487             connect_timeout = 30,
488             strict_host_checking = False,
489             persistent = True,
490             blocking = True,
491             with_lock = False
492             ):
493         """ Notice that this invocation will block until the
494         execution finishes. If this is not the desired behavior,
495         use 'run' instead."""
496
497         if self.localhost:
498             (out, err), proc = execfuncs.lexec(command, 
499                     user = self.get("username"), # still problem with localhost
500                     sudo = sudo,
501                     env = env)
502         else:
503             if with_lock:
504                 # If the execute command is blocking, we don't want to keep
505                 # the node lock. This lock is used to avoid race conditions
506                 # when creating the ControlMaster sockets. A more elegant
507                 # solution is needed.
508                 with self._node_lock:
509                     (out, err), proc = sshfuncs.rexec(
510                         command, 
511                         host = self.get("hostname"),
512                         user = self.get("username"),
513                         port = self.get("port"),
514                         gwuser = self.get("gatewayUser"),
515                         gw = self.get("gateway"),
516                         agent = True,
517                         sudo = sudo,
518                         identity = self.get("identity"),
519                         server_key = self.get("serverKey"),
520                         env = env,
521                         tty = tty,
522                         forward_x11 = forward_x11,
523                         retry = retry,
524                         connect_timeout = connect_timeout,
525                         persistent = persistent,
526                         blocking = blocking, 
527                         strict_host_checking = strict_host_checking
528                         )
529             else:
530                 (out, err), proc = sshfuncs.rexec(
531                     command, 
532                     host = self.get("hostname"),
533                     user = self.get("username"),
534                     port = self.get("port"),
535                     gwuser = self.get("gatewayUser"),
536                     gw = self.get("gateway"),
537                     agent = True,
538                     sudo = sudo,
539                     identity = self.get("identity"),
540                     server_key = self.get("serverKey"),
541                     env = env,
542                     tty = tty,
543                     forward_x11 = forward_x11,
544                     retry = retry,
545                     connect_timeout = connect_timeout,
546                     persistent = persistent,
547                     blocking = blocking, 
548                     strict_host_checking = strict_host_checking
549                     )
550
551         return (out, err), proc
552
553     def run(self, command, home,
554             create_home = False,
555             pidfile = 'pidfile',
556             stdin = None, 
557             stdout = 'stdout', 
558             stderr = 'stderr', 
559             sudo = False,
560             tty = False,
561             strict_host_checking = False):
562         
563         self.debug("Running command '%s'" % command)
564         
565         if self.localhost:
566             (out, err), proc = execfuncs.lspawn(command, pidfile,
567                     home = home, 
568                     create_home = create_home, 
569                     stdin = stdin or '/dev/null',
570                     stdout = stdout or '/dev/null',
571                     stderr = stderr or '/dev/null',
572                     sudo = sudo) 
573         else:
574             with self._node_lock:
575                 (out, err), proc = sshfuncs.rspawn(
576                     command,
577                     pidfile = pidfile,
578                     home = home,
579                     create_home = create_home,
580                     stdin = stdin or '/dev/null',
581                     stdout = stdout or '/dev/null',
582                     stderr = stderr or '/dev/null',
583                     sudo = sudo,
584                     host = self.get("hostname"),
585                     user = self.get("username"),
586                     port = self.get("port"),
587                     gwuser = self.get("gatewayUser"),
588                     gw = self.get("gateway"),
589                     agent = True,
590                     identity = self.get("identity"),
591                     server_key = self.get("serverKey"),
592                     tty = tty,
593                     strict_host_checking = strict_host_checking
594                     )
595
596         return (out, err), proc
597
598     def getpid(self, home, pidfile = "pidfile"):
599         if self.localhost:
600             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
601         else:
602             with self._node_lock:
603                 pidtuple = sshfuncs.rgetpid(
604                     os.path.join(home, pidfile),
605                     host = self.get("hostname"),
606                     user = self.get("username"),
607                     port = self.get("port"),
608                     gwuser = self.get("gatewayUser"),
609                     gw = self.get("gateway"),
610                     agent = True,
611                     identity = self.get("identity"),
612                     server_key = self.get("serverKey"),
613                     strict_host_checking = False
614                     )
615         
616         return pidtuple
617
618     def status(self, pid, ppid):
619         if self.localhost:
620             status = execfuncs.lstatus(pid, ppid)
621         else:
622             with self._node_lock:
623                 status = sshfuncs.rstatus(
624                         pid, ppid,
625                         host = self.get("hostname"),
626                         user = self.get("username"),
627                         port = self.get("port"),
628                         gwuser = self.get("gatewayUser"),
629                         gw = self.get("gateway"),
630                         agent = True,
631                         identity = self.get("identity"),
632                         server_key = self.get("serverKey"),
633                         strict_host_checking = False
634                         )
635            
636         return status
637     
638     def kill(self, pid, ppid, sudo = False):
639         out = err = ""
640         proc = None
641         status = self.status(pid, ppid)
642
643         if status == sshfuncs.ProcStatus.RUNNING:
644             if self.localhost:
645                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
646             else:
647                 with self._node_lock:
648                     (out, err), proc = sshfuncs.rkill(
649                         pid, ppid,
650                         host = self.get("hostname"),
651                         user = self.get("username"),
652                         port = self.get("port"),
653                         gwuser = self.get("gatewayUser"),
654                         gw = self.get("gateway"),
655                         agent = True,
656                         sudo = sudo,
657                         identity = self.get("identity"),
658                         server_key = self.get("serverKey"),
659                         strict_host_checking = False
660                         )
661
662         return (out, err), proc
663
664     def copy(self, src, dst):
665         if self.localhost:
666             (out, err), proc = execfuncs.lcopy(src, dst, 
667                     recursive = True)
668         else:
669             with self._node_lock:
670                 (out, err), proc = sshfuncs.rcopy(
671                     src, dst, 
672                     port = self.get("port"),
673                     gwuser = self.get("gatewayUser"),
674                     gw = self.get("gateway"),
675                     identity = self.get("identity"),
676                     server_key = self.get("serverKey"),
677                     recursive = True,
678                     strict_host_checking = False)
679
680         return (out, err), proc
681
682     def upload(self, src, dst, text = False, overwrite = True,
683             raise_on_error = True):
684         """ Copy content to destination
685
686         src  string with the content to copy. Can be:
687             - plain text
688             - a string with the path to a local file
689             - a string with a semi-colon separeted list of local files
690             - a string with a local directory
691
692         dst  string with destination path on the remote host (remote is 
693             always self.host)
694
695         text src is text input, it must be stored into a temp file before 
696         uploading
697         """
698         # If source is a string input 
699         f = None
700         if text and not os.path.isfile(src):
701             # src is text input that should be uploaded as file
702             # create a temporal file with the content to upload
703             f = tempfile.NamedTemporaryFile(delete=False)
704             f.write(src)
705             f.close()
706             src = f.name
707
708         # If dst files should not be overwritten, check that the files do not
709         # exits already
710         if isinstance(src, str):
711             src = map(str.strip, src.split(";"))
712     
713         if overwrite == False:
714             src = self.filter_existing_files(src, dst)
715             if not src:
716                 return ("", ""), None
717
718         if not self.localhost:
719             # Build destination as <user>@<server>:<path>
720             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
721
722         ((out, err), proc) = self.copy(src, dst)
723
724         # clean up temp file
725         if f:
726             os.remove(f.name)
727
728         if err:
729             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
730             self.error(msg, out, err)
731             
732             msg = "%s out: %s err: %s" % (msg, out, err)
733             if raise_on_error:
734                 raise RuntimeError, msg
735
736         return ((out, err), proc)
737
738     def download(self, src, dst, raise_on_error = True):
739         if not self.localhost:
740             # Build destination as <user>@<server>:<path>
741             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
742
743         ((out, err), proc) = self.copy(src, dst)
744
745         if err:
746             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
747             self.error(msg, out, err)
748
749             if raise_on_error:
750                 raise RuntimeError, msg
751
752         return ((out, err), proc)
753
754     def install_packages_command(self, packages):
755         command = ""
756         if self.use_rpm:
757             command = rpmfuncs.install_packages_command(self.os, packages)
758         elif self.use_deb:
759             command = debfuncs.install_packages_command(self.os, packages)
760         else:
761             msg = "Error installing packages ( OS not known ) "
762             self.error(msg, self.os)
763             raise RuntimeError, msg
764
765         return command
766
767     def install_packages(self, packages, home, run_home = None,
768             raise_on_error = True):
769         """ Install packages in the Linux host.
770
771         'home' is the directory to upload the package installation script.
772         'run_home' is the directory from where to execute the script.
773         """
774         command = self.install_packages_command(packages)
775
776         run_home = run_home or home
777
778         (out, err), proc = self.run_and_wait(command, run_home, 
779             shfile = os.path.join(home, "instpkg.sh"),
780             pidfile = "instpkg_pidfile",
781             ecodefile = "instpkg_exitcode",
782             stdout = "instpkg_stdout", 
783             stderr = "instpkg_stderr",
784             overwrite = False,
785             raise_on_error = raise_on_error)
786
787         return (out, err), proc 
788
789     def remove_packages(self, packages, home, run_home = None,
790             raise_on_error = True):
791         """ Uninstall packages from the Linux host.
792
793         'home' is the directory to upload the package un-installation script.
794         'run_home' is the directory from where to execute the script.
795         """
796         if self.use_rpm:
797             command = rpmfuncs.remove_packages_command(self.os, packages)
798         elif self.use_deb:
799             command = debfuncs.remove_packages_command(self.os, packages)
800         else:
801             msg = "Error removing packages ( OS not known ) "
802             self.error(msg)
803             raise RuntimeError, msg
804
805         run_home = run_home or home
806
807         (out, err), proc = self.run_and_wait(command, run_home, 
808             shfile = os.path.join(home, "rmpkg.sh"),
809             pidfile = "rmpkg_pidfile",
810             ecodefile = "rmpkg_exitcode",
811             stdout = "rmpkg_stdout", 
812             stderr = "rmpkg_stderr",
813             overwrite = False,
814             raise_on_error = raise_on_error)
815          
816         return (out, err), proc 
817
818     def mkdir(self, paths, clean = False):
819         """ Paths is either a single remote directory path to create,
820         or a list of directories to create.
821         """
822         if clean:
823             self.rmdir(paths)
824
825         if isinstance(paths, str):
826             paths = [paths]
827
828         cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
829
830         return self.execute(cmd, with_lock = True)
831
832     def rmdir(self, paths):
833         """ Paths is either a single remote directory path to delete,
834         or a list of directories to delete.
835         """
836
837         if isinstance(paths, str):
838             paths = [paths]
839
840         cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
841
842         return self.execute(cmd, with_lock = True)
843         
844     def run_and_wait(self, command, home, 
845             shfile = "cmd.sh",
846             env = None,
847             overwrite = True,
848             pidfile = "pidfile", 
849             ecodefile = "exitcode", 
850             stdin = None, 
851             stdout = "stdout", 
852             stderr = "stderr", 
853             sudo = False,
854             tty = False,
855             raise_on_error = True):
856         """
857         Uploads the 'command' to a bash script in the host.
858         Then runs the script detached in background in the host, and
859         busy-waites until the script finishes executing.
860         """
861
862         if not shfile.startswith("/"):
863             shfile = os.path.join(home, shfile)
864
865         self.upload_command(command, 
866             shfile = shfile, 
867             ecodefile = ecodefile, 
868             env = env,
869             overwrite = overwrite)
870
871         command = "bash %s" % shfile
872         # run command in background in remote host
873         (out, err), proc = self.run(command, home, 
874                 pidfile = pidfile,
875                 stdin = stdin, 
876                 stdout = stdout, 
877                 stderr = stderr, 
878                 sudo = sudo,
879                 tty = tty)
880
881         # check no errors occurred
882         if proc.poll():
883             msg = " Failed to run command '%s' " % command
884             self.error(msg, out, err)
885             if raise_on_error:
886                 raise RuntimeError, msg
887
888         # Wait for pid file to be generated
889         pid, ppid = self.wait_pid(
890                 home = home, 
891                 pidfile = pidfile, 
892                 raise_on_error = raise_on_error)
893
894         # wait until command finishes to execute
895         self.wait_run(pid, ppid)
896       
897         (eout, err), proc = self.check_errors(home,
898             ecodefile = ecodefile,
899             stderr = stderr)
900
901         # Out is what was written in the stderr file
902         if err:
903             msg = " Failed to run command '%s' " % command
904             self.error(msg, eout, err)
905
906             if raise_on_error:
907                 raise RuntimeError, msg
908
909         (out, oerr), proc = self.check_output(home, stdout)
910         
911         return (out, err), proc
912
913     def exitcode(self, home, ecodefile = "exitcode"):
914         """
915         Get the exit code of an application.
916         Returns an integer value with the exit code 
917         """
918         (out, err), proc = self.check_output(home, ecodefile)
919
920         # Succeeded to open file, return exit code in the file
921         if proc.wait() == 0:
922             try:
923                 return int(out.strip())
924             except:
925                 # Error in the content of the file!
926                 return ExitCode.CORRUPTFILE
927
928         # No such file or directory
929         if proc.returncode == 1:
930             return ExitCode.FILENOTFOUND
931         
932         # Other error from 'cat'
933         return ExitCode.ERROR
934
935     def upload_command(self, command, 
936             shfile = "cmd.sh",
937             ecodefile = "exitcode",
938             overwrite = True,
939             env = None):
940         """ Saves the command as a bash script file in the remote host, and
941         forces to save the exit code of the command execution to the ecodefile
942         """
943
944         if not (command.strip().endswith(";") or command.strip().endswith("&")):
945             command += ";"
946       
947         # The exit code of the command will be stored in ecodefile
948         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
949                 'command': command,
950                 'ecodefile': ecodefile,
951                 } 
952
953         # Export environment
954         environ = self.format_environment(env)
955
956         # Add environ to command
957         command = environ + command
958
959         return self.upload(command, shfile, text = True, overwrite = overwrite)
960
961     def format_environment(self, env, inline = False):
962         """ Formats the environment variables for a command to be executed
963         either as an inline command
964         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
965         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
966         """
967         if not env: return ""
968
969         # Remove extra white spaces
970         env = re.sub(r'\s+', ' ', env.strip())
971
972         sep = ";" if inline else "\n"
973         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
974
975     def check_errors(self, home, 
976             ecodefile = "exitcode", 
977             stderr = "stderr"):
978         """ Checks whether errors occurred while running a command.
979         It first checks the exit code for the command, and only if the
980         exit code is an error one it returns the error output.
981
982         """
983         proc = None
984         err = ""
985
986         # get exit code saved in the 'exitcode' file
987         ecode = self.exitcode(home, ecodefile)
988
989         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
990             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
991         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
992             # The process returned an error code or didn't exist. 
993             # Check standard error.
994             (err, eerr), proc = self.check_output(home, stderr)
995
996             # If the stderr file was not found, assume nothing bad happened,
997             # and just ignore the error.
998             # (cat returns 1 for error "No such file or directory")
999             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
1000                 err = "" 
1001             
1002         return ("", err), proc
1003  
1004     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1005         """ Waits until the pid file for the command is generated, 
1006             and returns the pid and ppid of the process """
1007         pid = ppid = None
1008         delay = 1.0
1009
1010         for i in xrange(2):
1011             pidtuple = self.getpid(home = home, pidfile = pidfile)
1012             
1013             if pidtuple:
1014                 pid, ppid = pidtuple
1015                 break
1016             else:
1017                 time.sleep(delay)
1018                 delay = delay * 1.5
1019         else:
1020             msg = " Failed to get pid for pidfile %s/%s " % (
1021                     home, pidfile )
1022             self.error(msg)
1023             
1024             if raise_on_error:
1025                 raise RuntimeError, msg
1026
1027         return pid, ppid
1028
1029     def wait_run(self, pid, ppid, trial = 0):
1030         """ wait for a remote process to finish execution """
1031         delay = 1.0
1032
1033         while True:
1034             status = self.status(pid, ppid)
1035             
1036             if status is ProcStatus.FINISHED:
1037                 break
1038             elif status is not ProcStatus.RUNNING:
1039                 delay = delay * 1.5
1040                 time.sleep(delay)
1041                 # If it takes more than 20 seconds to start, then
1042                 # asume something went wrong
1043                 if delay > 20:
1044                     break
1045             else:
1046                 # The app is running, just wait...
1047                 time.sleep(0.5)
1048
1049     def check_output(self, home, filename):
1050         """ Retrives content of file """
1051         (out, err), proc = self.execute("cat %s" % 
1052             os.path.join(home, filename), retry = 1, with_lock = True)
1053         return (out, err), proc
1054
1055     def is_alive(self):
1056         """ Checks if host is responsive
1057         """
1058         if self.localhost:
1059             return True
1060
1061         out = err = ""
1062         msg = "Unresponsive host. Wrong answer. "
1063
1064         # The underlying SSH layer will sometimes return an empty
1065         # output (even if the command was executed without errors).
1066         # To work arround this, repeat the operation N times or
1067         # until the result is not empty string
1068         try:
1069             (out, err), proc = self.execute("echo 'ALIVE'",
1070                     blocking = True,
1071                     with_lock = True)
1072     
1073             if out.find("ALIVE") > -1:
1074                 return True
1075         except:
1076             trace = traceback.format_exc()
1077             msg = "Unresponsive host. Error reaching host: %s " % trace
1078
1079         self.error(msg, out, err)
1080         return False
1081
1082     def find_home(self):
1083         """ Retrieves host home directory
1084         """
1085         # The underlying SSH layer will sometimes return an empty
1086         # output (even if the command was executed without errors).
1087         # To work arround this, repeat the operation N times or
1088         # until the result is not empty string
1089         msg = "Impossible to retrieve HOME directory"
1090         try:
1091             (out, err), proc = self.execute("echo ${HOME}",
1092                     blocking = True,
1093                     with_lock = True)
1094     
1095             if out.strip() != "":
1096                 self._home_dir =  out.strip()
1097         except:
1098             trace = traceback.format_exc()
1099             msg = "Impossible to retrieve HOME directory %s" % trace
1100
1101         if not self._home_dir:
1102             self.error(msg)
1103             raise RuntimeError, msg
1104
1105     def filter_existing_files(self, src, dst):
1106         """ Removes files that already exist in the Linux host from src list
1107         """
1108         # construct a dictionary with { dst: src }
1109         dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1110                     if len(src) > 1 else dict({dst: src[0]})
1111
1112         command = []
1113         for d in dests.keys():
1114             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1115
1116         command = ";".join(command)
1117
1118         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1119     
1120         for d in dests.keys():
1121             if out.find(d) > -1:
1122                 del dests[d]
1123
1124         if not dests:
1125             return []
1126
1127         return dests.values()
1128