Fix bug strict host checking
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import socket
32 import tempfile
33 import time
34 import threading
35 import traceback
36
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!! 
39
40 class ExitCode:
41     """
42     Error codes that the rexitcode function can return if unable to
43     check the exit code of a spawned process
44     """
45     FILENOTFOUND = -1
46     CORRUPTFILE = -2
47     ERROR = -3
48     OK = 0
49
50 class OSType:
51     """
52     Supported flavors of Linux OS
53     """
54     FEDORA_8 = "f8"
55     FEDORA_12 = "f12"
56     FEDORA_14 = "f14"
57     FEDORA = "fedora"
58     UBUNTU = "ubuntu"
59     DEBIAN = "debian"
60
61 @clsinit_copy
62 class LinuxNode(ResourceManager):
63     """
64     .. class:: Class Args :
65       
66         :param ec: The Experiment controller
67         :type ec: ExperimentController
68         :param guid: guid of the RM
69         :type guid: int
70
71     .. note::
72
73         There are different ways in which commands can be executed using the
74         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
75         'run_and_wait'). 
76         
77         Brief explanation:
78
79             * 'execute' (blocking mode) :  
80
81                      HOW IT WORKS: 'execute', forks a process and run the
82                      command, synchronously, attached to the terminal, in
83                      foreground.
84                      The execute method will block until the command returns
85                      the result on 'out', 'err' (so until it finishes executing).
86   
87                      USAGE: short-lived commands that must be executed attached
88                      to a terminal and in foreground, for which it IS necessary
89                      to block until the command has finished (e.g. if you want
90                      to run 'ls' or 'cat').
91
92             * 'execute' (NON blocking mode - blocking = False) :
93
94                     HOW IT WORKS: Same as before, except that execute method
95                     will return immediately (even if command still running).
96
97                     USAGE: long-lived commands that must be executed attached
98                     to a terminal and in foreground, but for which it is not
99                     necessary to block until the command has finished. (e.g.
100                     start an application using X11 forwarding)
101
102              * 'run' :
103
104                    HOW IT WORKS: Connects to the host ( using SSH if remote)
105                    and launches the command in background, detached from any
106                    terminal (daemonized), and returns. The command continues to
107                    run remotely, but since it is detached from the terminal,
108                    its pipes (stdin, stdout, stderr) can't be redirected to the
109                    console (as normal non detached processes would), and so they
110                    are explicitly redirected to files. The pidfile is created as
111                    part of the process of launching the command. The pidfile
112                    holds the pid and ppid of the process forked in background,
113                    so later on it is possible to check whether the command is still
114                    running.
115
116                     USAGE: long-lived commands that can run detached in background,
117                     for which it is NOT necessary to block (wait) until the command
118                     has finished. (e.g. start an application that is not using X11
119                     forwarding. It can run detached and remotely in background)
120
121              * 'run_and_wait' :
122
123                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
124                     the command has finished execution. It also checks whether
125                     errors occurred during runtime by reading the exitcode file,
126                     which contains the exit code of the command that was run
127                     (checking stderr only is not always reliable since many
128                     commands throw debugging info to stderr and the only way to
129                     automatically know whether an error really happened is to
130                     check the process exit code).
131
132                     Another difference with respect to 'run', is that instead
133                     of directly executing the command as a bash command line,
134                     it uploads the command to a bash script and runs the script.
135                     This allows to use the bash script to debug errors, since
136                     it remains at the remote host and can be run manually to
137                     reproduce the error.
138                   
139                     USAGE: medium-lived commands that can run detached in
140                     background, for which it IS necessary to block (wait) until
141                     the command has finished. (e.g. Package installation,
142                     source compilation, file download, etc)
143
144     """
145     _rtype = "LinuxNode"
146     _help = "Controls Linux host machines ( either localhost or a host " \
147             "that can be accessed using a SSH key)"
148     _backend_type = "linux"
149
150     @classmethod
151     def _register_attributes(cls):
152         hostname = Attribute("hostname", "Hostname of the machine",
153                 flags = Flags.Design)
154
155         username = Attribute("username", "Local account username", 
156                 flags = Flags.Credential)
157
158         port = Attribute("port", "SSH port", flags = Flags.Design)
159         
160         home = Attribute("home",
161                 "Experiment home directory to store all experiment related files",
162                 flags = Flags.Design)
163         
164         identity = Attribute("identity", "SSH identity file",
165                 flags = Flags.Credential)
166         
167         server_key = Attribute("serverKey", "Server public key", 
168                 flags = Flags.Design)
169         
170         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
171                 " from node home folder before starting experiment", 
172                 type = Types.Bool,
173                 default = False,
174                 flags = Flags.Design)
175
176         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
177                 " from a previous same experiment, before the new experiment starts", 
178                 type = Types.Bool,
179                 default = False,
180                 flags = Flags.Design)
181         
182         clean_processes = Attribute("cleanProcesses", 
183                 "Kill all running processes before starting experiment",
184                 type = Types.Bool,
185                 default = False,
186                 flags = Flags.Design)
187         
188         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
189                 "releasing the resource",
190                 flags = Flags.Design)
191
192         gateway_user = Attribute("gatewayUser", "Gateway account username",
193                 flags = Flags.Design)
194
195         gateway = Attribute("gateway", "Hostname of the gateway machine",
196                 flags = Flags.Design)
197
198         ip = Attribute("ip", "Linux host public IP address. "
199                    "Must not be modified by the user unless hostname is 'localhost'",
200                     flags = Flags.Design)
201
202         cls._register_attribute(hostname)
203         cls._register_attribute(username)
204         cls._register_attribute(port)
205         cls._register_attribute(home)
206         cls._register_attribute(identity)
207         cls._register_attribute(server_key)
208         cls._register_attribute(clean_home)
209         cls._register_attribute(clean_experiment)
210         cls._register_attribute(clean_processes)
211         cls._register_attribute(tear_down)
212         cls._register_attribute(gateway_user)
213         cls._register_attribute(gateway)
214         cls._register_attribute(ip)
215
216     def __init__(self, ec, guid):
217         super(LinuxNode, self).__init__(ec, guid)
218         self._os = None
219         # home directory at Linux host
220         self._home_dir = ""
221
222         # lock to prevent concurrent applications on the same node,
223         # to execute commands at the same time. There are potential
224         # concurrency issues when using SSH to a same host from 
225         # multiple threads. There are also possible operational 
226         # issues, e.g. an application querying the existence 
227         # of a file or folder prior to its creation, and another 
228         # application creating the same file or folder in between.
229         self._node_lock = threading.Lock()
230     
231     def log_message(self, msg):
232         return " guid %d - host %s - %s " % (self.guid, 
233                 self.get("hostname"), msg)
234
235     @property
236     def home_dir(self):
237         home = self.get("home") or ""
238         if not home.startswith("/"):
239            home = os.path.join(self._home_dir, home) 
240         return home
241
242     @property
243     def nepi_home(self):
244         return os.path.join(self.home_dir, ".nepi")
245
246     @property
247     def usr_dir(self):
248         return os.path.join(self.nepi_home, "nepi-usr")
249
250     @property
251     def lib_dir(self):
252         return os.path.join(self.usr_dir, "lib")
253
254     @property
255     def bin_dir(self):
256         return os.path.join(self.usr_dir, "bin")
257
258     @property
259     def src_dir(self):
260         return os.path.join(self.usr_dir, "src")
261
262     @property
263     def share_dir(self):
264         return os.path.join(self.usr_dir, "share")
265
266     @property
267     def exp_dir(self):
268         return os.path.join(self.nepi_home, "nepi-exp")
269
270     @property
271     def exp_home(self):
272         return os.path.join(self.exp_dir, self.ec.exp_id)
273
274     @property
275     def node_home(self):
276         return os.path.join(self.exp_home, "node-%d" % self.guid)
277
278     @property
279     def run_home(self):
280         return os.path.join(self.node_home, self.ec.run_id)
281
282     @property
283     def os(self):
284         if self._os:
285             return self._os
286
287         if not self.localhost and not self.get("username"):
288             msg = "Can't resolve OS, insufficient data "
289             self.error(msg)
290             raise RuntimeError, msg
291
292         out = self.get_os()
293
294         if out.find("Fedora release 8") == 0:
295             self._os = OSType.FEDORA_8
296         elif out.find("Fedora release 12") == 0:
297             self._os = OSType.FEDORA_12
298         elif out.find("Fedora release 14") == 0:
299             self._os = OSType.FEDORA_14
300         elif out.find("Fedora release") == 0:
301             self._os = OSType.FEDORA
302         elif out.find("Debian") == 0: 
303             self._os = OSType.DEBIAN
304         elif out.find("Ubuntu") ==0:
305             self._os = OSType.UBUNTU
306         else:
307             msg = "Unsupported OS"
308             self.error(msg, out)
309             raise RuntimeError, "%s - %s " %( msg, out )
310
311         return self._os
312
313     def get_os(self):
314         # The underlying SSH layer will sometimes return an empty
315         # output (even if the command was executed without errors).
316         # To work arround this, repeat the operation N times or
317         # until the result is not empty string
318         out = ""
319         try:
320             (out, err), proc = self.execute("cat /etc/issue", 
321                     with_lock = True,
322                     blocking = True)
323         except:
324             trace = traceback.format_exc()
325             msg = "Error detecting OS: %s " % trace
326             self.error(msg, out, err)
327         
328         return out
329
330     @property
331     def use_deb(self):
332         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
333
334     @property
335     def use_rpm(self):
336         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
337                 OSType.FEDORA]
338
339     @property
340     def localhost(self):
341         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
342
343     def do_provision(self):
344         # check if host is alive
345         if not self.is_alive():
346             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
347             self.error(msg)
348             raise RuntimeError, msg
349
350         self.find_home()
351
352         if self.get("cleanProcesses"):
353             self.clean_processes()
354
355         if self.get("cleanHome"):
356             self.clean_home()
357  
358         if self.get("cleanExperiment"):
359             self.clean_experiment()
360     
361         # Create shared directory structure and node home directory
362         paths = [self.lib_dir, 
363             self.bin_dir, 
364             self.src_dir, 
365             self.share_dir, 
366             self.node_home]
367
368         self.mkdir(paths)
369
370         # Get Public IP address
371         if not self.get("ip"):
372             if self.localhost:
373                 ip = socket.gethostbyname(socket.gethostname())
374             else:
375                 ip = socket.gethostbyname(self.get("hostname"))
376
377             self.set("ip", ip)
378
379         super(LinuxNode, self).do_provision()
380
381     def do_deploy(self):
382         if self.state == ResourceState.NEW:
383             self.info("Deploying node")
384             self.do_discover()
385             self.do_provision()
386
387         # Node needs to wait until all associated interfaces are 
388         # ready before it can finalize deployment
389         from nepi.resources.linux.interface import LinuxInterface
390         ifaces = self.get_connected(LinuxInterface.get_rtype())
391         for iface in ifaces:
392             if iface.state < ResourceState.READY:
393                 self.ec.schedule(reschedule_delay, self.deploy)
394                 return 
395
396         super(LinuxNode, self).do_deploy()
397
398     def do_release(self):
399         rms = self.get_connected()
400         for rm in rms:
401             # Node needs to wait until all associated RMs are released
402             # before it can be released
403             if rm.state != ResourceState.RELEASED:
404                 self.ec.schedule(reschedule_delay, self.release)
405                 return 
406
407         tear_down = self.get("tearDown")
408         if tear_down:
409             self.execute(tear_down)
410
411         self.clean_processes()
412
413         super(LinuxNode, self).do_release()
414
415     def valid_connection(self, guid):
416         # TODO: Validate!
417         return True
418
419     def clean_processes(self):
420         self.info("Cleaning up processes")
421
422         if self.localhost:
423             return 
424         
425         if self.get("username") != 'root':
426             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
427                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
428         else:
429             if self.state >= ResourceState.READY:
430                 import pickle
431                 pids = pickle.load(open("/tmp/save.proc", "rb"))
432                 pids_temp = dict()
433                 ps_aux = "ps aux |awk '{print $2,$11}'"
434                 (out, err), proc = self.execute(ps_aux)
435                 if len(out) != 0:
436                     for line in out.strip().split("\n"):
437                         parts = line.strip().split(" ")
438                         pids_temp[parts[0]] = parts[1]
439                     kill_pids = set(pids_temp.items()) - set(pids.items())
440                     kill_pids = ' '.join(dict(kill_pids).keys())
441
442                     cmd = ("killall tcpdump || /bin/true ; " +
443                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
444                         "kill %s || /bin/true ; " % kill_pids)
445                 else:
446                     cmd = ("killall tcpdump || /bin/true ; " +
447                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
448             else:
449                 cmd = ("killall tcpdump || /bin/true ; " +
450                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
451
452         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
453
454     def clean_home(self):
455         """ Cleans all NEPI related folders in the Linux host
456         """
457         self.info("Cleaning up home")
458         
459         cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
460                 self.home_dir )
461
462         return self.execute(cmd, with_lock = True)
463
464     def clean_experiment(self):
465         """ Cleans all experiment related files in the Linux host.
466         It preserves NEPI files and folders that have a multi experiment
467         scope.
468         """
469         self.info("Cleaning up experiment files")
470         
471         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
472                 self.exp_dir,
473                 self.ec.exp_id )
474             
475         return self.execute(cmd, with_lock = True)
476
477     def execute(self, command,
478             sudo = False,
479             env = None,
480             tty = False,
481             forward_x11 = False,
482             retry = 3,
483             connect_timeout = 30,
484             strict_host_checking = False,
485             persistent = True,
486             blocking = True,
487             with_lock = False
488             ):
489         """ Notice that this invocation will block until the
490         execution finishes. If this is not the desired behavior,
491         use 'run' instead."""
492
493         if self.localhost:
494             (out, err), proc = execfuncs.lexec(command, 
495                     user = self.get("username"), # still problem with localhost
496                     sudo = sudo,
497                     env = env)
498         else:
499             if with_lock:
500                 # If the execute command is blocking, we don't want to keep
501                 # the node lock. This lock is used to avoid race conditions
502                 # when creating the ControlMaster sockets. A more elegant
503                 # solution is needed.
504                 with self._node_lock:
505                     (out, err), proc = sshfuncs.rexec(
506                         command, 
507                         host = self.get("hostname"),
508                         user = self.get("username"),
509                         port = self.get("port"),
510                         gwuser = self.get("gatewayUser"),
511                         gw = self.get("gateway"),
512                         agent = True,
513                         sudo = sudo,
514                         identity = self.get("identity"),
515                         server_key = self.get("serverKey"),
516                         env = env,
517                         tty = tty,
518                         forward_x11 = forward_x11,
519                         retry = retry,
520                         connect_timeout = connect_timeout,
521                         persistent = persistent,
522                         blocking = blocking, 
523                         strict_host_checking = strict_host_checking
524                         )
525             else:
526                 (out, err), proc = sshfuncs.rexec(
527                     command, 
528                     host = self.get("hostname"),
529                     user = self.get("username"),
530                     port = self.get("port"),
531                     gwuser = self.get("gatewayUser"),
532                     gw = self.get("gateway"),
533                     agent = True,
534                     sudo = sudo,
535                     identity = self.get("identity"),
536                     server_key = self.get("serverKey"),
537                     env = env,
538                     tty = tty,
539                     forward_x11 = forward_x11,
540                     retry = retry,
541                     connect_timeout = connect_timeout,
542                     persistent = persistent,
543                     blocking = blocking, 
544                     strict_host_checking = strict_host_checking
545                     )
546
547         return (out, err), proc
548
549     def run(self, command, home,
550             create_home = False,
551             pidfile = 'pidfile',
552             stdin = None, 
553             stdout = 'stdout', 
554             stderr = 'stderr', 
555             sudo = False,
556             tty = False,
557             strict_host_checking = False):
558         
559         self.debug("Running command '%s'" % command)
560         
561         if self.localhost:
562             (out, err), proc = execfuncs.lspawn(command, pidfile,
563                     home = home, 
564                     create_home = create_home, 
565                     stdin = stdin or '/dev/null',
566                     stdout = stdout or '/dev/null',
567                     stderr = stderr or '/dev/null',
568                     sudo = sudo) 
569         else:
570             with self._node_lock:
571                 (out, err), proc = sshfuncs.rspawn(
572                     command,
573                     pidfile = pidfile,
574                     home = home,
575                     create_home = create_home,
576                     stdin = stdin or '/dev/null',
577                     stdout = stdout or '/dev/null',
578                     stderr = stderr or '/dev/null',
579                     sudo = sudo,
580                     host = self.get("hostname"),
581                     user = self.get("username"),
582                     port = self.get("port"),
583                     gwuser = self.get("gatewayUser"),
584                     gw = self.get("gateway"),
585                     agent = True,
586                     identity = self.get("identity"),
587                     server_key = self.get("serverKey"),
588                     tty = tty,
589                     strict_host_checking = strict_host_checking
590                     )
591
592         return (out, err), proc
593
594     def getpid(self, home, pidfile = "pidfile"):
595         if self.localhost:
596             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
597         else:
598             with self._node_lock:
599                 pidtuple = sshfuncs.rgetpid(
600                     os.path.join(home, pidfile),
601                     host = self.get("hostname"),
602                     user = self.get("username"),
603                     port = self.get("port"),
604                     gwuser = self.get("gatewayUser"),
605                     gw = self.get("gateway"),
606                     agent = True,
607                     identity = self.get("identity"),
608                     server_key = self.get("serverKey"),
609                     strict_host_checking = False
610                     )
611         
612         return pidtuple
613
614     def status(self, pid, ppid):
615         if self.localhost:
616             status = execfuncs.lstatus(pid, ppid)
617         else:
618             with self._node_lock:
619                 status = sshfuncs.rstatus(
620                         pid, ppid,
621                         host = self.get("hostname"),
622                         user = self.get("username"),
623                         port = self.get("port"),
624                         gwuser = self.get("gatewayUser"),
625                         gw = self.get("gateway"),
626                         agent = True,
627                         identity = self.get("identity"),
628                         server_key = self.get("serverKey"),
629                         strict_host_checking = False
630                         )
631            
632         return status
633     
634     def kill(self, pid, ppid, sudo = False):
635         out = err = ""
636         proc = None
637         status = self.status(pid, ppid)
638
639         if status == sshfuncs.ProcStatus.RUNNING:
640             if self.localhost:
641                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
642             else:
643                 with self._node_lock:
644                     (out, err), proc = sshfuncs.rkill(
645                         pid, ppid,
646                         host = self.get("hostname"),
647                         user = self.get("username"),
648                         port = self.get("port"),
649                         gwuser = self.get("gatewayUser"),
650                         gw = self.get("gateway"),
651                         agent = True,
652                         sudo = sudo,
653                         identity = self.get("identity"),
654                         server_key = self.get("serverKey"),
655                         strict_host_checking = False
656                         )
657
658         return (out, err), proc
659
660     def copy(self, src, dst):
661         if self.localhost:
662             (out, err), proc = execfuncs.lcopy(src, dst, 
663                     recursive = True)
664         else:
665             with self._node_lock:
666                 (out, err), proc = sshfuncs.rcopy(
667                     src, dst, 
668                     port = self.get("port"),
669                     gwuser = self.get("gatewayUser"),
670                     gw = self.get("gateway"),
671                     identity = self.get("identity"),
672                     server_key = self.get("serverKey"),
673                     recursive = True,
674                     strict_host_checking = False)
675
676         return (out, err), proc
677
678     def upload(self, src, dst, text = False, overwrite = True,
679             raise_on_error = True):
680         """ Copy content to destination
681
682         src  string with the content to copy. Can be:
683             - plain text
684             - a string with the path to a local file
685             - a string with a semi-colon separeted list of local files
686             - a string with a local directory
687
688         dst  string with destination path on the remote host (remote is 
689             always self.host)
690
691         text src is text input, it must be stored into a temp file before 
692         uploading
693         """
694         # If source is a string input 
695         f = None
696         if text and not os.path.isfile(src):
697             # src is text input that should be uploaded as file
698             # create a temporal file with the content to upload
699             f = tempfile.NamedTemporaryFile(delete=False)
700             f.write(src)
701             f.close()
702             src = f.name
703
704         # If dst files should not be overwritten, check that the files do not
705         # exits already
706         if isinstance(src, str):
707             src = map(str.strip, src.split(";"))
708     
709         if overwrite == False:
710             src = self.filter_existing_files(src, dst)
711             if not src:
712                 return ("", ""), None
713
714         if not self.localhost:
715             # Build destination as <user>@<server>:<path>
716             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
717
718         ((out, err), proc) = self.copy(src, dst)
719
720         # clean up temp file
721         if f:
722             os.remove(f.name)
723
724         if err:
725             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
726             self.error(msg, out, err)
727             
728             msg = "%s out: %s err: %s" % (msg, out, err)
729             if raise_on_error:
730                 raise RuntimeError, msg
731
732         return ((out, err), proc)
733
734     def download(self, src, dst, raise_on_error = True):
735         if not self.localhost:
736             # Build destination as <user>@<server>:<path>
737             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
738
739         ((out, err), proc) = self.copy(src, dst)
740
741         if err:
742             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
743             self.error(msg, out, err)
744
745             if raise_on_error:
746                 raise RuntimeError, msg
747
748         return ((out, err), proc)
749
750     def install_packages_command(self, packages):
751         command = ""
752         if self.use_rpm:
753             command = rpmfuncs.install_packages_command(self.os, packages)
754         elif self.use_deb:
755             command = debfuncs.install_packages_command(self.os, packages)
756         else:
757             msg = "Error installing packages ( OS not known ) "
758             self.error(msg, self.os)
759             raise RuntimeError, msg
760
761         return command
762
763     def install_packages(self, packages, home, run_home = None,
764             raise_on_error = True):
765         """ Install packages in the Linux host.
766
767         'home' is the directory to upload the package installation script.
768         'run_home' is the directory from where to execute the script.
769         """
770         command = self.install_packages_command(packages)
771
772         run_home = run_home or home
773
774         (out, err), proc = self.run_and_wait(command, run_home, 
775             shfile = os.path.join(home, "instpkg.sh"),
776             pidfile = "instpkg_pidfile",
777             ecodefile = "instpkg_exitcode",
778             stdout = "instpkg_stdout", 
779             stderr = "instpkg_stderr",
780             overwrite = False,
781             raise_on_error = raise_on_error)
782
783         return (out, err), proc 
784
785     def remove_packages(self, packages, home, run_home = None,
786             raise_on_error = True):
787         """ Uninstall packages from the Linux host.
788
789         'home' is the directory to upload the package un-installation script.
790         'run_home' is the directory from where to execute the script.
791         """
792         if self.use_rpm:
793             command = rpmfuncs.remove_packages_command(self.os, packages)
794         elif self.use_deb:
795             command = debfuncs.remove_packages_command(self.os, packages)
796         else:
797             msg = "Error removing packages ( OS not known ) "
798             self.error(msg)
799             raise RuntimeError, msg
800
801         run_home = run_home or home
802
803         (out, err), proc = self.run_and_wait(command, run_home, 
804             shfile = os.path.join(home, "rmpkg.sh"),
805             pidfile = "rmpkg_pidfile",
806             ecodefile = "rmpkg_exitcode",
807             stdout = "rmpkg_stdout", 
808             stderr = "rmpkg_stderr",
809             overwrite = False,
810             raise_on_error = raise_on_error)
811          
812         return (out, err), proc 
813
814     def mkdir(self, paths, clean = False):
815         """ Paths is either a single remote directory path to create,
816         or a list of directories to create.
817         """
818         if clean:
819             self.rmdir(paths)
820
821         if isinstance(paths, str):
822             paths = [paths]
823
824         cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
825
826         return self.execute(cmd, with_lock = True)
827
828     def rmdir(self, paths):
829         """ Paths is either a single remote directory path to delete,
830         or a list of directories to delete.
831         """
832
833         if isinstance(paths, str):
834             paths = [paths]
835
836         cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
837
838         return self.execute(cmd, with_lock = True)
839         
840     def run_and_wait(self, command, home, 
841             shfile = "cmd.sh",
842             env = None,
843             overwrite = True,
844             pidfile = "pidfile", 
845             ecodefile = "exitcode", 
846             stdin = None, 
847             stdout = "stdout", 
848             stderr = "stderr", 
849             sudo = False,
850             tty = False,
851             raise_on_error = True):
852         """
853         Uploads the 'command' to a bash script in the host.
854         Then runs the script detached in background in the host, and
855         busy-waites until the script finishes executing.
856         """
857
858         if not shfile.startswith("/"):
859             shfile = os.path.join(home, shfile)
860
861         self.upload_command(command, 
862             shfile = shfile, 
863             ecodefile = ecodefile, 
864             env = env,
865             overwrite = overwrite)
866
867         command = "bash %s" % shfile
868         # run command in background in remote host
869         (out, err), proc = self.run(command, home, 
870                 pidfile = pidfile,
871                 stdin = stdin, 
872                 stdout = stdout, 
873                 stderr = stderr, 
874                 sudo = sudo,
875                 tty = tty)
876
877         # check no errors occurred
878         if proc.poll():
879             msg = " Failed to run command '%s' " % command
880             self.error(msg, out, err)
881             if raise_on_error:
882                 raise RuntimeError, msg
883
884         # Wait for pid file to be generated
885         pid, ppid = self.wait_pid(
886                 home = home, 
887                 pidfile = pidfile, 
888                 raise_on_error = raise_on_error)
889
890         # wait until command finishes to execute
891         self.wait_run(pid, ppid)
892       
893         (eout, err), proc = self.check_errors(home,
894             ecodefile = ecodefile,
895             stderr = stderr)
896
897         # Out is what was written in the stderr file
898         if err:
899             msg = " Failed to run command '%s' " % command
900             self.error(msg, eout, err)
901
902             if raise_on_error:
903                 raise RuntimeError, msg
904
905         (out, oerr), proc = self.check_output(home, stdout)
906         
907         return (out, err), proc
908
909     def exitcode(self, home, ecodefile = "exitcode"):
910         """
911         Get the exit code of an application.
912         Returns an integer value with the exit code 
913         """
914         (out, err), proc = self.check_output(home, ecodefile)
915
916         # Succeeded to open file, return exit code in the file
917         if proc.wait() == 0:
918             try:
919                 return int(out.strip())
920             except:
921                 # Error in the content of the file!
922                 return ExitCode.CORRUPTFILE
923
924         # No such file or directory
925         if proc.returncode == 1:
926             return ExitCode.FILENOTFOUND
927         
928         # Other error from 'cat'
929         return ExitCode.ERROR
930
931     def upload_command(self, command, 
932             shfile = "cmd.sh",
933             ecodefile = "exitcode",
934             overwrite = True,
935             env = None):
936         """ Saves the command as a bash script file in the remote host, and
937         forces to save the exit code of the command execution to the ecodefile
938         """
939
940         if not (command.strip().endswith(";") or command.strip().endswith("&")):
941             command += ";"
942       
943         # The exit code of the command will be stored in ecodefile
944         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
945                 'command': command,
946                 'ecodefile': ecodefile,
947                 } 
948
949         # Export environment
950         environ = self.format_environment(env)
951
952         # Add environ to command
953         command = environ + command
954
955         return self.upload(command, shfile, text = True, overwrite = overwrite)
956
957     def format_environment(self, env, inline = False):
958         """ Formats the environment variables for a command to be executed
959         either as an inline command
960         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
961         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
962         """
963         if not env: return ""
964
965         # Remove extra white spaces
966         env = re.sub(r'\s+', ' ', env.strip())
967
968         sep = ";" if inline else "\n"
969         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
970
971     def check_errors(self, home, 
972             ecodefile = "exitcode", 
973             stderr = "stderr"):
974         """ Checks whether errors occurred while running a command.
975         It first checks the exit code for the command, and only if the
976         exit code is an error one it returns the error output.
977
978         """
979         proc = None
980         err = ""
981
982         # get exit code saved in the 'exitcode' file
983         ecode = self.exitcode(home, ecodefile)
984
985         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
986             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
987         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
988             # The process returned an error code or didn't exist. 
989             # Check standard error.
990             (err, eerr), proc = self.check_output(home, stderr)
991
992             # If the stderr file was not found, assume nothing bad happened,
993             # and just ignore the error.
994             # (cat returns 1 for error "No such file or directory")
995             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
996                 err = "" 
997             
998         return ("", err), proc
999  
1000     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1001         """ Waits until the pid file for the command is generated, 
1002             and returns the pid and ppid of the process """
1003         pid = ppid = None
1004         delay = 1.0
1005
1006         for i in xrange(2):
1007             pidtuple = self.getpid(home = home, pidfile = pidfile)
1008             
1009             if pidtuple:
1010                 pid, ppid = pidtuple
1011                 break
1012             else:
1013                 time.sleep(delay)
1014                 delay = delay * 1.5
1015         else:
1016             msg = " Failed to get pid for pidfile %s/%s " % (
1017                     home, pidfile )
1018             self.error(msg)
1019             
1020             if raise_on_error:
1021                 raise RuntimeError, msg
1022
1023         return pid, ppid
1024
1025     def wait_run(self, pid, ppid, trial = 0):
1026         """ wait for a remote process to finish execution """
1027         delay = 1.0
1028
1029         while True:
1030             status = self.status(pid, ppid)
1031             
1032             if status is ProcStatus.FINISHED:
1033                 break
1034             elif status is not ProcStatus.RUNNING:
1035                 delay = delay * 1.5
1036                 time.sleep(delay)
1037                 # If it takes more than 20 seconds to start, then
1038                 # asume something went wrong
1039                 if delay > 20:
1040                     break
1041             else:
1042                 # The app is running, just wait...
1043                 time.sleep(0.5)
1044
1045     def check_output(self, home, filename):
1046         """ Retrives content of file """
1047         (out, err), proc = self.execute("cat %s" % 
1048             os.path.join(home, filename), retry = 1, with_lock = True)
1049         return (out, err), proc
1050
1051     def is_alive(self):
1052         """ Checks if host is responsive
1053         """
1054         if self.localhost:
1055             return True
1056
1057         out = err = ""
1058         msg = "Unresponsive host. Wrong answer. "
1059
1060         # The underlying SSH layer will sometimes return an empty
1061         # output (even if the command was executed without errors).
1062         # To work arround this, repeat the operation N times or
1063         # until the result is not empty string
1064         try:
1065             (out, err), proc = self.execute("echo 'ALIVE'",
1066                     blocking = True,
1067                     with_lock = True)
1068     
1069             if out.find("ALIVE") > -1:
1070                 return True
1071         except:
1072             trace = traceback.format_exc()
1073             msg = "Unresponsive host. Error reaching host: %s " % trace
1074
1075         self.error(msg, out, err)
1076         return False
1077
1078     def find_home(self):
1079         """ Retrieves host home directory
1080         """
1081         # The underlying SSH layer will sometimes return an empty
1082         # output (even if the command was executed without errors).
1083         # To work arround this, repeat the operation N times or
1084         # until the result is not empty string
1085         msg = "Impossible to retrieve HOME directory"
1086         try:
1087             (out, err), proc = self.execute("echo ${HOME}",
1088                     blocking = True,
1089                     with_lock = True)
1090     
1091             if out.strip() != "":
1092                 self._home_dir =  out.strip()
1093         except:
1094             trace = traceback.format_exc()
1095             msg = "Impossible to retrieve HOME directory %s" % trace
1096
1097         if not self._home_dir:
1098             self.error(msg)
1099             raise RuntimeError, msg
1100
1101     def filter_existing_files(self, src, dst):
1102         """ Removes files that already exist in the Linux host from src list
1103         """
1104         # construct a dictionary with { dst: src }
1105         dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1106                     if len(src) > 1 else dict({dst: src[0]})
1107
1108         command = []
1109         for d in dests.keys():
1110             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1111
1112         command = ";".join(command)
1113
1114         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1115     
1116         for d in dests.keys():
1117             if out.find(d) > -1:
1118                 del dests[d]
1119
1120         if not dests:
1121             return []
1122
1123         return dests.values()
1124