Fix multihop
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.Design)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.Design)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.Design)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.Design)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.Design)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.Design)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.Design)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.Design)
190
191         gateway_user = Attribute("gatewayUser", "Gateway account username",
192                 flags = Flags.Design)
193
194         gateway = Attribute("gateway", "Hostname of the gateway machine",
195                 flags = Flags.Design)
196
197         cls._register_attribute(hostname)
198         cls._register_attribute(username)
199         cls._register_attribute(port)
200         cls._register_attribute(home)
201         cls._register_attribute(identity)
202         cls._register_attribute(server_key)
203         cls._register_attribute(clean_home)
204         cls._register_attribute(clean_experiment)
205         cls._register_attribute(clean_processes)
206         cls._register_attribute(tear_down)
207         cls._register_attribute(gateway_user)
208         cls._register_attribute(gateway)
209
210     def __init__(self, ec, guid):
211         super(LinuxNode, self).__init__(ec, guid)
212         self._os = None
213         # home directory at Linux host
214         self._home_dir = ""
215
216         # lock to prevent concurrent applications on the same node,
217         # to execute commands at the same time. There are potential
218         # concurrency issues when using SSH to a same host from 
219         # multiple threads. There are also possible operational 
220         # issues, e.g. an application querying the existence 
221         # of a file or folder prior to its creation, and another 
222         # application creating the same file or folder in between.
223         self._node_lock = threading.Lock()
224     
225     def log_message(self, msg):
226         return " guid %d - host %s - %s " % (self.guid, 
227                 self.get("hostname"), msg)
228
229     @property
230     def home_dir(self):
231         home = self.get("home") or ""
232         if not home.startswith("/"):
233            home = os.path.join(self._home_dir, home) 
234         return home
235
236     @property
237     def nepi_home(self):
238         return os.path.join(self.home_dir, ".nepi")
239
240     @property
241     def usr_dir(self):
242         return os.path.join(self.nepi_home, "nepi-usr")
243
244     @property
245     def lib_dir(self):
246         return os.path.join(self.usr_dir, "lib")
247
248     @property
249     def bin_dir(self):
250         return os.path.join(self.usr_dir, "bin")
251
252     @property
253     def src_dir(self):
254         return os.path.join(self.usr_dir, "src")
255
256     @property
257     def share_dir(self):
258         return os.path.join(self.usr_dir, "share")
259
260     @property
261     def exp_dir(self):
262         return os.path.join(self.nepi_home, "nepi-exp")
263
264     @property
265     def exp_home(self):
266         return os.path.join(self.exp_dir, self.ec.exp_id)
267
268     @property
269     def node_home(self):
270         return os.path.join(self.exp_home, "node-%d" % self.guid)
271
272     @property
273     def run_home(self):
274         return os.path.join(self.node_home, self.ec.run_id)
275
276     @property
277     def os(self):
278         if self._os:
279             return self._os
280
281         if self.get("hostname") not in ["localhost", "127.0.0.1"] and \
282                 not self.get("username"):
283             msg = "Can't resolve OS, insufficient data "
284             self.error(msg)
285             raise RuntimeError, msg
286
287         out = self.get_os()
288
289         if out.find("Fedora release 8") == 0:
290             self._os = OSType.FEDORA_8
291         elif out.find("Fedora release 12") == 0:
292             self._os = OSType.FEDORA_12
293         elif out.find("Fedora release 14") == 0:
294             self._os = OSType.FEDORA_14
295         elif out.find("Fedora release") == 0:
296             self._os = OSType.FEDORA
297         elif out.find("Debian") == 0: 
298             self._os = OSType.DEBIAN
299         elif out.find("Ubuntu") ==0:
300             self._os = OSType.UBUNTU
301         else:
302             msg = "Unsupported OS"
303             self.error(msg, out)
304             raise RuntimeError, "%s - %s " %( msg, out )
305
306         return self._os
307
308     def get_os(self):
309         # The underlying SSH layer will sometimes return an empty
310         # output (even if the command was executed without errors).
311         # To work arround this, repeat the operation N times or
312         # until the result is not empty string
313         out = ""
314         try:
315             (out, err), proc = self.execute("cat /etc/issue", 
316                     with_lock = True,
317                     blocking = True)
318         except:
319             trace = traceback.format_exc()
320             msg = "Error detecting OS: %s " % trace
321             self.error(msg, out, err)
322         
323         return out
324
325     @property
326     def use_deb(self):
327         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
328
329     @property
330     def use_rpm(self):
331         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
332                 OSType.FEDORA]
333
334     @property
335     def localhost(self):
336         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
337
338     def do_provision(self):
339         # check if host is alive
340         if not self.is_alive():
341             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
342             self.error(msg)
343             raise RuntimeError, msg
344
345         self.find_home()
346
347         if self.get("cleanProcesses"):
348             self.clean_processes()
349
350         if self.get("cleanHome"):
351             self.clean_home()
352  
353         if self.get("cleanExperiment"):
354             self.clean_experiment()
355     
356         # Create shared directory structure and node home directory
357         paths = [self.lib_dir, 
358             self.bin_dir, 
359             self.src_dir, 
360             self.share_dir, 
361             self.node_home]
362
363         self.mkdir(paths)
364
365         super(LinuxNode, self).do_provision()
366
367     def do_deploy(self):
368         if self.state == ResourceState.NEW:
369             self.info("Deploying node")
370             self.do_discover()
371             self.do_provision()
372
373         # Node needs to wait until all associated interfaces are 
374         # ready before it can finalize deployment
375         from nepi.resources.linux.interface import LinuxInterface
376         ifaces = self.get_connected(LinuxInterface.get_rtype())
377         for iface in ifaces:
378             if iface.state < ResourceState.READY:
379                 self.ec.schedule(reschedule_delay, self.deploy)
380                 return 
381
382         super(LinuxNode, self).do_deploy()
383
384     def do_release(self):
385         rms = self.get_connected()
386         for rm in rms:
387             # Node needs to wait until all associated RMs are released
388             # before it can be released
389             if rm.state != ResourceState.RELEASED:
390                 self.ec.schedule(reschedule_delay, self.release)
391                 return 
392
393         tear_down = self.get("tearDown")
394         if tear_down:
395             self.execute(tear_down)
396
397         self.clean_processes()
398
399         super(LinuxNode, self).do_release()
400
401     def valid_connection(self, guid):
402         # TODO: Validate!
403         return True
404
405     def clean_processes(self):
406         self.info("Cleaning up processes")
407  
408         if self.get("hostname") in ["localhost", "127.0.0.2"]:
409             return 
410         
411         if self.get("username") != 'root':
412             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
413                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
414         else:
415             if self.state >= ResourceState.READY:
416                 import pickle
417                 pids = pickle.load(open("/tmp/save.proc", "rb"))
418                 pids_temp = dict()
419                 ps_aux = "ps aux |awk '{print $2,$11}'"
420                 (out, err), proc = self.execute(ps_aux)
421                 if len(out) != 0:
422                     for line in out.strip().split("\n"):
423                         parts = line.strip().split(" ")
424                         pids_temp[parts[0]] = parts[1]
425                     kill_pids = set(pids_temp.items()) - set(pids.items())
426                     kill_pids = ' '.join(dict(kill_pids).keys())
427
428                     cmd = ("killall tcpdump || /bin/true ; " +
429                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
430                         "kill %s || /bin/true ; " % kill_pids)
431                 else:
432                     cmd = ("killall tcpdump || /bin/true ; " +
433                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
434             else:
435                 cmd = ("killall tcpdump || /bin/true ; " +
436                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
437
438         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
439
440     def clean_home(self):
441         """ Cleans all NEPI related folders in the Linux host
442         """
443         self.info("Cleaning up home")
444         
445         cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
446                 self.home_dir )
447
448         return self.execute(cmd, with_lock = True)
449
450     def clean_experiment(self):
451         """ Cleans all experiment related files in the Linux host.
452         It preserves NEPI files and folders that have a multi experiment
453         scope.
454         """
455         self.info("Cleaning up experiment files")
456         
457         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
458                 self.exp_dir,
459                 self.ec.exp_id )
460             
461         return self.execute(cmd, with_lock = True)
462
463     def execute(self, command,
464             sudo = False,
465             env = None,
466             tty = False,
467             forward_x11 = False,
468             retry = 3,
469             connect_timeout = 30,
470             strict_host_checking = False,
471             persistent = True,
472             blocking = True,
473             with_lock = False
474             ):
475         """ Notice that this invocation will block until the
476         execution finishes. If this is not the desired behavior,
477         use 'run' instead."""
478
479         if self.localhost:
480             (out, err), proc = execfuncs.lexec(command, 
481                     user = self.get("username"), # still problem with localhost
482                     sudo = sudo,
483                     env = env)
484         else:
485             if with_lock:
486                 # If the execute command is blocking, we don't want to keep
487                 # the node lock. This lock is used to avoid race conditions
488                 # when creating the ControlMaster sockets. A more elegant
489                 # solution is needed.
490                 with self._node_lock:
491                     (out, err), proc = sshfuncs.rexec(
492                         command, 
493                         host = self.get("hostname"),
494                         user = self.get("username"),
495                         port = self.get("port"),
496                         gwuser = self.get("gatewayUser"),
497                         gw = self.get("gateway"),
498                         agent = True,
499                         sudo = sudo,
500                         identity = self.get("identity"),
501                         server_key = self.get("serverKey"),
502                         env = env,
503                         tty = tty,
504                         forward_x11 = forward_x11,
505                         retry = retry,
506                         connect_timeout = connect_timeout,
507                         persistent = persistent,
508                         blocking = blocking, 
509                         strict_host_checking = strict_host_checking
510                         )
511             else:
512                 (out, err), proc = sshfuncs.rexec(
513                     command, 
514                     host = self.get("hostname"),
515                     user = self.get("username"),
516                     port = self.get("port"),
517                     gwuser = self.get("gatewayUser"),
518                     gw = self.get("gateway"),
519                     agent = True,
520                     sudo = sudo,
521                     identity = self.get("identity"),
522                     server_key = self.get("serverKey"),
523                     env = env,
524                     tty = tty,
525                     forward_x11 = forward_x11,
526                     retry = retry,
527                     connect_timeout = connect_timeout,
528                     persistent = persistent,
529                     blocking = blocking, 
530                     strict_host_checking = strict_host_checking
531                     )
532
533         return (out, err), proc
534
535     def run(self, command, home,
536             create_home = False,
537             pidfile = 'pidfile',
538             stdin = None, 
539             stdout = 'stdout', 
540             stderr = 'stderr', 
541             sudo = False,
542             tty = False):
543         
544         self.debug("Running command '%s'" % command)
545         
546         if self.localhost:
547             (out, err), proc = execfuncs.lspawn(command, pidfile,
548                     home = home, 
549                     create_home = create_home, 
550                     stdin = stdin or '/dev/null',
551                     stdout = stdout or '/dev/null',
552                     stderr = stderr or '/dev/null',
553                     sudo = sudo) 
554         else:
555             with self._node_lock:
556                 (out, err), proc = sshfuncs.rspawn(
557                     command,
558                     pidfile = pidfile,
559                     home = home,
560                     create_home = create_home,
561                     stdin = stdin or '/dev/null',
562                     stdout = stdout or '/dev/null',
563                     stderr = stderr or '/dev/null',
564                     sudo = sudo,
565                     host = self.get("hostname"),
566                     user = self.get("username"),
567                     port = self.get("port"),
568                     gwuser = self.get("gatewayUser"),
569                     gw = self.get("gateway"),
570                     agent = True,
571                     identity = self.get("identity"),
572                     server_key = self.get("serverKey"),
573                     tty = tty
574                     )
575
576         return (out, err), proc
577
578     def getpid(self, home, pidfile = "pidfile"):
579         if self.localhost:
580             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
581         else:
582             with self._node_lock:
583                 pidtuple = sshfuncs.rgetpid(
584                     os.path.join(home, pidfile),
585                     host = self.get("hostname"),
586                     user = self.get("username"),
587                     port = self.get("port"),
588                     gwuser = self.get("gatewayUser"),
589                     gw = self.get("gateway"),
590                     agent = True,
591                     identity = self.get("identity"),
592                     server_key = self.get("serverKey")
593                     )
594         
595         return pidtuple
596
597     def status(self, pid, ppid):
598         if self.localhost:
599             status = execfuncs.lstatus(pid, ppid)
600         else:
601             with self._node_lock:
602                 status = sshfuncs.rstatus(
603                         pid, ppid,
604                         host = self.get("hostname"),
605                         user = self.get("username"),
606                         port = self.get("port"),
607                         gwuser = self.get("gatewayUser"),
608                         gw = self.get("gateway"),
609                         agent = True,
610                         identity = self.get("identity"),
611                         server_key = self.get("serverKey")
612                         )
613            
614         return status
615     
616     def kill(self, pid, ppid, sudo = False):
617         out = err = ""
618         proc = None
619         status = self.status(pid, ppid)
620
621         if status == sshfuncs.ProcStatus.RUNNING:
622             if self.localhost:
623                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
624             else:
625                 with self._node_lock:
626                     (out, err), proc = sshfuncs.rkill(
627                         pid, ppid,
628                         host = self.get("hostname"),
629                         user = self.get("username"),
630                         port = self.get("port"),
631                         gwuser = self.get("gatewayUser"),
632                         gw = self.get("gateway"),
633                         agent = True,
634                         sudo = sudo,
635                         identity = self.get("identity"),
636                         server_key = self.get("serverKey")
637                         )
638
639         return (out, err), proc
640
641     def copy(self, src, dst):
642         if self.localhost:
643             (out, err), proc = execfuncs.lcopy(src, dst, 
644                     recursive = True)
645         else:
646             with self._node_lock:
647                 (out, err), proc = sshfuncs.rcopy(
648                     src, dst, 
649                     port = self.get("port"),
650                     gwuser = self.get("gatewayUser"),
651                     gw = self.get("gateway"),
652                     identity = self.get("identity"),
653                     server_key = self.get("serverKey"),
654                     recursive = True,
655                     strict_host_checking = False)
656
657         return (out, err), proc
658
659     def upload(self, src, dst, text = False, overwrite = True,
660             raise_on_error = True):
661         """ Copy content to destination
662
663         src  string with the content to copy. Can be:
664             - plain text
665             - a string with the path to a local file
666             - a string with a semi-colon separeted list of local files
667             - a string with a local directory
668
669         dst  string with destination path on the remote host (remote is 
670             always self.host)
671
672         text src is text input, it must be stored into a temp file before 
673         uploading
674         """
675         # If source is a string input 
676         f = None
677         if text and not os.path.isfile(src):
678             # src is text input that should be uploaded as file
679             # create a temporal file with the content to upload
680             f = tempfile.NamedTemporaryFile(delete=False)
681             f.write(src)
682             f.close()
683             src = f.name
684
685         # If dst files should not be overwritten, check that the files do not
686         # exits already
687         if isinstance(src, str):
688             src = map(str.strip, src.split(";"))
689     
690         if overwrite == False:
691             src = self.filter_existing_files(src, dst)
692             if not src:
693                 return ("", ""), None
694
695         if not self.localhost:
696             # Build destination as <user>@<server>:<path>
697             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
698
699         ((out, err), proc) = self.copy(src, dst)
700
701         # clean up temp file
702         if f:
703             os.remove(f.name)
704
705         if err:
706             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
707             self.error(msg, out, err)
708             
709             msg = "%s out: %s err: %s" % (msg, out, err)
710             if raise_on_error:
711                 raise RuntimeError, msg
712
713         return ((out, err), proc)
714
715     def download(self, src, dst, raise_on_error = True):
716         if not self.localhost:
717             # Build destination as <user>@<server>:<path>
718             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
719
720         ((out, err), proc) = self.copy(src, dst)
721
722         if err:
723             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
724             self.error(msg, out, err)
725
726             if raise_on_error:
727                 raise RuntimeError, msg
728
729         return ((out, err), proc)
730
731     def install_packages_command(self, packages):
732         command = ""
733         if self.use_rpm:
734             command = rpmfuncs.install_packages_command(self.os, packages)
735         elif self.use_deb:
736             command = debfuncs.install_packages_command(self.os, packages)
737         else:
738             msg = "Error installing packages ( OS not known ) "
739             self.error(msg, self.os)
740             raise RuntimeError, msg
741
742         return command
743
744     def install_packages(self, packages, home, run_home = None,
745             raise_on_error = True):
746         """ Install packages in the Linux host.
747
748         'home' is the directory to upload the package installation script.
749         'run_home' is the directory from where to execute the script.
750         """
751         command = self.install_packages_command(packages)
752
753         run_home = run_home or home
754
755         (out, err), proc = self.run_and_wait(command, run_home, 
756             shfile = os.path.join(home, "instpkg.sh"),
757             pidfile = "instpkg_pidfile",
758             ecodefile = "instpkg_exitcode",
759             stdout = "instpkg_stdout", 
760             stderr = "instpkg_stderr",
761             overwrite = False,
762             raise_on_error = raise_on_error)
763
764         return (out, err), proc 
765
766     def remove_packages(self, packages, home, run_home = None,
767             raise_on_error = True):
768         """ Uninstall packages from the Linux host.
769
770         'home' is the directory to upload the package un-installation script.
771         'run_home' is the directory from where to execute the script.
772         """
773         if self.use_rpm:
774             command = rpmfuncs.remove_packages_command(self.os, packages)
775         elif self.use_deb:
776             command = debfuncs.remove_packages_command(self.os, packages)
777         else:
778             msg = "Error removing packages ( OS not known ) "
779             self.error(msg)
780             raise RuntimeError, msg
781
782         run_home = run_home or home
783
784         (out, err), proc = self.run_and_wait(command, run_home, 
785             shfile = os.path.join(home, "rmpkg.sh"),
786             pidfile = "rmpkg_pidfile",
787             ecodefile = "rmpkg_exitcode",
788             stdout = "rmpkg_stdout", 
789             stderr = "rmpkg_stderr",
790             overwrite = False,
791             raise_on_error = raise_on_error)
792          
793         return (out, err), proc 
794
795     def mkdir(self, paths, clean = False):
796         """ Paths is either a single remote directory path to create,
797         or a list of directories to create.
798         """
799         if clean:
800             self.rmdir(paths)
801
802         if isinstance(paths, str):
803             paths = [paths]
804
805         cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
806
807         return self.execute(cmd, with_lock = True)
808
809     def rmdir(self, paths):
810         """ Paths is either a single remote directory path to delete,
811         or a list of directories to delete.
812         """
813
814         if isinstance(paths, str):
815             paths = [paths]
816
817         cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
818
819         return self.execute(cmd, with_lock = True)
820         
821     def run_and_wait(self, command, home, 
822             shfile = "cmd.sh",
823             env = None,
824             overwrite = True,
825             pidfile = "pidfile", 
826             ecodefile = "exitcode", 
827             stdin = None, 
828             stdout = "stdout", 
829             stderr = "stderr", 
830             sudo = False,
831             tty = False,
832             raise_on_error = True):
833         """
834         Uploads the 'command' to a bash script in the host.
835         Then runs the script detached in background in the host, and
836         busy-waites until the script finishes executing.
837         """
838
839         if not shfile.startswith("/"):
840             shfile = os.path.join(home, shfile)
841
842         self.upload_command(command, 
843             shfile = shfile, 
844             ecodefile = ecodefile, 
845             env = env,
846             overwrite = overwrite)
847
848         command = "bash %s" % shfile
849         # run command in background in remote host
850         (out, err), proc = self.run(command, home, 
851                 pidfile = pidfile,
852                 stdin = stdin, 
853                 stdout = stdout, 
854                 stderr = stderr, 
855                 sudo = sudo,
856                 tty = tty)
857
858         # check no errors occurred
859         if proc.poll():
860             msg = " Failed to run command '%s' " % command
861             self.error(msg, out, err)
862             if raise_on_error:
863                 raise RuntimeError, msg
864
865         # Wait for pid file to be generated
866         pid, ppid = self.wait_pid(
867                 home = home, 
868                 pidfile = pidfile, 
869                 raise_on_error = raise_on_error)
870
871         # wait until command finishes to execute
872         self.wait_run(pid, ppid)
873       
874         (eout, err), proc = self.check_errors(home,
875             ecodefile = ecodefile,
876             stderr = stderr)
877
878         # Out is what was written in the stderr file
879         if err:
880             msg = " Failed to run command '%s' " % command
881             self.error(msg, eout, err)
882
883             if raise_on_error:
884                 raise RuntimeError, msg
885
886         (out, oerr), proc = self.check_output(home, stdout)
887         
888         return (out, err), proc
889
890     def exitcode(self, home, ecodefile = "exitcode"):
891         """
892         Get the exit code of an application.
893         Returns an integer value with the exit code 
894         """
895         (out, err), proc = self.check_output(home, ecodefile)
896
897         # Succeeded to open file, return exit code in the file
898         if proc.wait() == 0:
899             try:
900                 return int(out.strip())
901             except:
902                 # Error in the content of the file!
903                 return ExitCode.CORRUPTFILE
904
905         # No such file or directory
906         if proc.returncode == 1:
907             return ExitCode.FILENOTFOUND
908         
909         # Other error from 'cat'
910         return ExitCode.ERROR
911
912     def upload_command(self, command, 
913             shfile = "cmd.sh",
914             ecodefile = "exitcode",
915             overwrite = True,
916             env = None):
917         """ Saves the command as a bash script file in the remote host, and
918         forces to save the exit code of the command execution to the ecodefile
919         """
920
921         if not (command.strip().endswith(";") or command.strip().endswith("&")):
922             command += ";"
923       
924         # The exit code of the command will be stored in ecodefile
925         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
926                 'command': command,
927                 'ecodefile': ecodefile,
928                 } 
929
930         # Export environment
931         environ = self.format_environment(env)
932
933         # Add environ to command
934         command = environ + command
935
936         return self.upload(command, shfile, text = True, overwrite = overwrite)
937
938     def format_environment(self, env, inline = False):
939         """ Formats the environment variables for a command to be executed
940         either as an inline command
941         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
942         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
943         """
944         if not env: return ""
945
946         # Remove extra white spaces
947         env = re.sub(r'\s+', ' ', env.strip())
948
949         sep = ";" if inline else "\n"
950         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
951
952     def check_errors(self, home, 
953             ecodefile = "exitcode", 
954             stderr = "stderr"):
955         """ Checks whether errors occurred while running a command.
956         It first checks the exit code for the command, and only if the
957         exit code is an error one it returns the error output.
958
959         """
960         proc = None
961         err = ""
962
963         # get exit code saved in the 'exitcode' file
964         ecode = self.exitcode(home, ecodefile)
965
966         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
967             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
968         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
969             # The process returned an error code or didn't exist. 
970             # Check standard error.
971             (err, eerr), proc = self.check_output(home, stderr)
972
973             # If the stderr file was not found, assume nothing bad happened,
974             # and just ignore the error.
975             # (cat returns 1 for error "No such file or directory")
976             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
977                 err = "" 
978             
979         return ("", err), proc
980  
981     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
982         """ Waits until the pid file for the command is generated, 
983             and returns the pid and ppid of the process """
984         pid = ppid = None
985         delay = 1.0
986
987         for i in xrange(2):
988             pidtuple = self.getpid(home = home, pidfile = pidfile)
989             
990             if pidtuple:
991                 pid, ppid = pidtuple
992                 break
993             else:
994                 time.sleep(delay)
995                 delay = delay * 1.5
996         else:
997             msg = " Failed to get pid for pidfile %s/%s " % (
998                     home, pidfile )
999             self.error(msg)
1000             
1001             if raise_on_error:
1002                 raise RuntimeError, msg
1003
1004         return pid, ppid
1005
1006     def wait_run(self, pid, ppid, trial = 0):
1007         """ wait for a remote process to finish execution """
1008         delay = 1.0
1009
1010         while True:
1011             status = self.status(pid, ppid)
1012             
1013             if status is ProcStatus.FINISHED:
1014                 break
1015             elif status is not ProcStatus.RUNNING:
1016                 delay = delay * 1.5
1017                 time.sleep(delay)
1018                 # If it takes more than 20 seconds to start, then
1019                 # asume something went wrong
1020                 if delay > 20:
1021                     break
1022             else:
1023                 # The app is running, just wait...
1024                 time.sleep(0.5)
1025
1026     def check_output(self, home, filename):
1027         """ Retrives content of file """
1028         (out, err), proc = self.execute("cat %s" % 
1029             os.path.join(home, filename), retry = 1, with_lock = True)
1030         return (out, err), proc
1031
1032     def is_alive(self):
1033         """ Checks if host is responsive
1034         """
1035         if self.localhost:
1036             return True
1037
1038         out = err = ""
1039         msg = "Unresponsive host. Wrong answer. "
1040
1041         # The underlying SSH layer will sometimes return an empty
1042         # output (even if the command was executed without errors).
1043         # To work arround this, repeat the operation N times or
1044         # until the result is not empty string
1045         try:
1046             (out, err), proc = self.execute("echo 'ALIVE'",
1047                     blocking = True,
1048                     with_lock = True)
1049     
1050             if out.find("ALIVE") > -1:
1051                 return True
1052         except:
1053             trace = traceback.format_exc()
1054             msg = "Unresponsive host. Error reaching host: %s " % trace
1055
1056         self.error(msg, out, err)
1057         return False
1058
1059     def find_home(self):
1060         """ Retrieves host home directory
1061         """
1062         # The underlying SSH layer will sometimes return an empty
1063         # output (even if the command was executed without errors).
1064         # To work arround this, repeat the operation N times or
1065         # until the result is not empty string
1066         msg = "Impossible to retrieve HOME directory"
1067         try:
1068             (out, err), proc = self.execute("echo ${HOME}",
1069                     blocking = True,
1070                     with_lock = True)
1071     
1072             if out.strip() != "":
1073                 self._home_dir =  out.strip()
1074         except:
1075             trace = traceback.format_exc()
1076             msg = "Impossible to retrieve HOME directory %s" % trace
1077
1078         if not self._home_dir:
1079             self.error(msg)
1080             raise RuntimeError, msg
1081
1082     def filter_existing_files(self, src, dst):
1083         """ Removes files that already exist in the Linux host from src list
1084         """
1085         # construct a dictionary with { dst: src }
1086         dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1087                     if len(src) > 1 else dict({dst: src[0]})
1088
1089         command = []
1090         for d in dests.keys():
1091             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1092
1093         command = ";".join(command)
1094
1095         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1096     
1097         for d in dests.keys():
1098             if out.find(d) > -1:
1099                 del dests[d]
1100
1101         if not dests:
1102             return []
1103
1104         return dests.values()
1105