applied the except and raise fixers to the master branch to close the gap with py3
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy, 
21                                      ResourceState)
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33 import traceback
34
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!! 
37
38 class ExitCode:
39     """
40     Error codes that the rexitcode function can return if unable to
41     check the exit code of a spawned process
42     """
43     FILENOTFOUND = -1
44     CORRUPTFILE = -2
45     ERROR = -3
46     OK = 0
47
48 class OSType:
49     """
50     Supported flavors of Linux OS
51     """
52     DEBIAN = 1 
53     UBUNTU = 1 << 1 
54     FEDORA = 1 << 2
55     FEDORA_8 = 1 << 3 | FEDORA 
56     FEDORA_12 = 1 << 4 | FEDORA 
57     FEDORA_14 = 1 << 5 | FEDORA 
58
59 @clsinit_copy
60 class LinuxNode(ResourceManager):
61     """
62     .. class:: Class Args :
63       
64         :param ec: The Experiment controller
65         :type ec: ExperimentController
66         :param guid: guid of the RM
67         :type guid: int
68
69     .. note::
70
71         There are different ways in which commands can be executed using the
72         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
73         'run_and_wait'). 
74         
75         Brief explanation:
76
77             * 'execute' (blocking mode) :  
78
79                      HOW IT WORKS: 'execute', forks a process and run the
80                      command, synchronously, attached to the terminal, in
81                      foreground.
82                      The execute method will block until the command returns
83                      the result on 'out', 'err' (so until it finishes executing).
84   
85                      USAGE: short-lived commands that must be executed attached
86                      to a terminal and in foreground, for which it IS necessary
87                      to block until the command has finished (e.g. if you want
88                      to run 'ls' or 'cat').
89
90             * 'execute' (NON blocking mode - blocking = False) :
91
92                     HOW IT WORKS: Same as before, except that execute method
93                     will return immediately (even if command still running).
94
95                     USAGE: long-lived commands that must be executed attached
96                     to a terminal and in foreground, but for which it is not
97                     necessary to block until the command has finished. (e.g.
98                     start an application using X11 forwarding)
99
100              * 'run' :
101
102                    HOW IT WORKS: Connects to the host ( using SSH if remote)
103                    and launches the command in background, detached from any
104                    terminal (daemonized), and returns. The command continues to
105                    run remotely, but since it is detached from the terminal,
106                    its pipes (stdin, stdout, stderr) can't be redirected to the
107                    console (as normal non detached processes would), and so they
108                    are explicitly redirected to files. The pidfile is created as
109                    part of the process of launching the command. The pidfile
110                    holds the pid and ppid of the process forked in background,
111                    so later on it is possible to check whether the command is still
112                    running.
113
114                     USAGE: long-lived commands that can run detached in background,
115                     for which it is NOT necessary to block (wait) until the command
116                     has finished. (e.g. start an application that is not using X11
117                     forwarding. It can run detached and remotely in background)
118
119              * 'run_and_wait' :
120
121                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122                     the command has finished execution. It also checks whether
123                     errors occurred during runtime by reading the exitcode file,
124                     which contains the exit code of the command that was run
125                     (checking stderr only is not always reliable since many
126                     commands throw debugging info to stderr and the only way to
127                     automatically know whether an error really happened is to
128                     check the process exit code).
129
130                     Another difference with respect to 'run', is that instead
131                     of directly executing the command as a bash command line,
132                     it uploads the command to a bash script and runs the script.
133                     This allows to use the bash script to debug errors, since
134                     it remains at the remote host and can be run manually to
135                     reproduce the error.
136                   
137                     USAGE: medium-lived commands that can run detached in
138                     background, for which it IS necessary to block (wait) until
139                     the command has finished. (e.g. Package installation,
140                     source compilation, file download, etc)
141
142     """
143     _rtype = "linux::Node"
144     _help = "Controls Linux host machines ( either localhost or a host " \
145             "that can be accessed using a SSH key)"
146     _platform = "linux"
147
148     @classmethod
149     def _register_attributes(cls):
150         cls._register_attribute(Attribute(
151             "hostname", "Hostname of the machine",
152             flags = Flags.Design))
153
154         cls._register_attribute(Attribute(
155             "username", "Local account username", 
156             flags = Flags.Credential))
157
158         cls._register_attribute(Attribute(
159             "port", "SSH port",
160             flags = Flags.Design))
161         
162         cls._register_attribute(Attribute(
163             "home",
164             "Experiment home directory to store all experiment related files",
165             flags = Flags.Design))
166         
167         cls._register_attribute(Attribute(
168             "identity", "SSH identity file",
169             flags = Flags.Credential))
170         
171         cls._register_attribute(Attribute(
172             "serverKey", "Server public key", 
173             flags = Flags.Design))
174         
175         cls._register_attribute(Attribute(
176             "cleanHome",
177             "Remove all nepi files and directories "
178             " from node home folder before starting experiment", 
179             type = Types.Bool,
180             default = False,
181             flags = Flags.Design))
182
183         cls._register_attribute(Attribute(
184             "cleanExperiment", "Remove all files and directories " 
185             " from a previous same experiment, before the new experiment starts", 
186             type = Types.Bool,
187             default = False,
188             flags = Flags.Design))
189         
190         cls._register_attribute(Attribute(
191             "cleanProcesses", 
192             "Kill all running processes before starting experiment",
193             type = Types.Bool,
194             default = False,
195             flags = Flags.Design))
196         
197         cls._register_attribute(Attribute(
198             "cleanProcessesAfter", 
199             """Kill all running processes after starting experiment
200             This might be dangerous when using user root""",
201             type = Types.Bool,
202             default = True,
203             flags = Flags.Design))
204         
205         cls._register_attribute(Attribute(
206             "tearDown",
207             "Bash script to be executed before releasing the resource",
208             flags = Flags.Design))
209
210         cls._register_attribute(Attribute(
211             "gatewayUser",
212             "Gateway account username",
213             flags = Flags.Design))
214
215         cls._register_attribute(Attribute(
216             "gateway",
217             "Hostname of the gateway machine",
218             flags = Flags.Design))
219
220         cls._register_attribute(Attribute(
221             "ip",
222             "Linux host public IP address. "
223             "Must not be modified by the user unless hostname is 'localhost'",
224             flags = Flags.Design))
225
226     def __init__(self, ec, guid):
227         super(LinuxNode, self).__init__(ec, guid)
228         self._os = None
229         # home directory at Linux host
230         self._home_dir = ""
231
232         # lock to prevent concurrent applications on the same node,
233         # to execute commands at the same time. There are potential
234         # concurrency issues when using SSH to a same host from 
235         # multiple threads. There are also possible operational 
236         # issues, e.g. an application querying the existence 
237         # of a file or folder prior to its creation, and another 
238         # application creating the same file or folder in between.
239         self._node_lock = threading.Lock()
240         
241     def log_message(self, msg):
242         return " guid {} - host {} - {} "\
243             .format(self.guid, self.get("hostname"), msg)
244
245     @property
246     def home_dir(self):
247         home = self.get("home") or ""
248         if not home.startswith("/"):
249             home = os.path.join(self._home_dir, home) 
250             return home
251
252     @property
253     def nepi_home(self):
254         return os.path.join(self.home_dir, ".nepi")
255
256     @property
257     def usr_dir(self):
258         return os.path.join(self.nepi_home, "nepi-usr")
259
260     @property
261     def lib_dir(self):
262         return os.path.join(self.usr_dir, "lib")
263
264     @property
265     def bin_dir(self):
266         return os.path.join(self.usr_dir, "bin")
267
268     @property
269     def src_dir(self):
270         return os.path.join(self.usr_dir, "src")
271
272     @property
273     def share_dir(self):
274         return os.path.join(self.usr_dir, "share")
275
276     @property
277     def exp_dir(self):
278         return os.path.join(self.nepi_home, "nepi-exp")
279
280     @property
281     def exp_home(self):
282         return os.path.join(self.exp_dir, self.ec.exp_id)
283
284     @property
285     def node_home(self):
286         return os.path.join(self.exp_home, "node-{}".format(self.guid))
287
288     @property
289     def run_home(self):
290         return os.path.join(self.node_home, self.ec.run_id)
291
292     @property
293     def os(self):
294         if self._os:
295             return self._os
296
297         if not self.localhost and not self.get("username"):
298             msg = "Can't resolve OS, insufficient data "
299             self.error(msg)
300             raise RuntimeError(msg)
301
302         out = self.get_os()
303
304         if out.find("Debian") == 0: 
305             self._os = OSType.DEBIAN
306         elif out.find("Ubuntu") ==0:
307             self._os = OSType.UBUNTU
308         elif out.find("Fedora release") == 0:
309             self._os = OSType.FEDORA
310             if out.find("Fedora release 8") == 0:
311                 self._os = OSType.FEDORA_8
312             elif out.find("Fedora release 12") == 0:
313                 self._os = OSType.FEDORA_12
314             elif out.find("Fedora release 14") == 0:
315                 self._os = OSType.FEDORA_14
316         else:
317             msg = "Unsupported OS"
318             self.error(msg, out)
319             raise RuntimeError("{} - {} ".format(msg, out))
320
321         return self._os
322
323     def get_os(self):
324         # The underlying SSH layer will sometimes return an empty
325         # output (even if the command was executed without errors).
326         # To work arround this, repeat the operation N times or
327         # until the result is not empty string
328         out = ""
329         try:
330             (out, err), proc = self.execute("cat /etc/issue", 
331                                             with_lock = True,
332                                             blocking = True)
333         except:
334             trace = traceback.format_exc()
335             msg = "Error detecting OS: {} ".format(trace)
336             self.error(msg, out, err)
337     
338         return out
339
340     @property
341     def use_deb(self):
342         return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
343
344     @property
345     def use_rpm(self):
346         return (self.os & OSType.FEDORA)
347
348     @property
349     def localhost(self):
350         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
351
352     def do_provision(self):
353         # check if host is alive
354         if not self.is_alive():
355             trace = traceback.format_exc()
356             msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
357             self.error(msg)
358             raise RuntimeError(msg)
359
360         self.find_home()
361
362         if self.get("cleanProcesses"):
363             self.clean_processes()
364
365         if self.get("cleanHome"):
366             self.clean_home()
367             
368         if self.get("cleanExperiment"):
369             self.clean_experiment()
370             
371         # Create shared directory structure and node home directory
372         paths = [self.lib_dir, 
373                  self.bin_dir, 
374                  self.src_dir, 
375                  self.share_dir, 
376                  self.node_home]
377
378         self.mkdir(paths)
379
380         # Get Public IP address if possible
381         if not self.get("ip"):
382             try:
383                 ip = sshfuncs.gethostbyname(self.get("hostname"))
384                 self.set("ip", ip)
385             except:
386                 if self.get("gateway") is None:
387                     msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
388                     self.error(msg)
389
390         super(LinuxNode, self).do_provision()
391
392     def do_deploy(self):
393         if self.state == ResourceState.NEW:
394             self.info("Deploying node")
395             self.do_discover()
396             self.do_provision()
397
398         # Node needs to wait until all associated interfaces are 
399         # ready before it can finalize deployment
400         from nepi.resources.linux.interface import LinuxInterface
401         ifaces = self.get_connected(LinuxInterface.get_rtype())
402         for iface in ifaces:
403             if iface.state < ResourceState.READY:
404                 self.ec.schedule(self.reschedule_delay, self.deploy)
405                 return 
406
407         super(LinuxNode, self).do_deploy()
408
409     def do_release(self):
410         rms = self.get_connected()
411         for rm in rms:
412             # Node needs to wait until all associated RMs are released
413             # before it can be released
414             if rm.state != ResourceState.RELEASED:
415                 self.ec.schedule(self.reschedule_delay, self.release)
416                 return 
417
418         tear_down = self.get("tearDown")
419         if tear_down:
420             self.execute(tear_down)
421
422         if self.get("cleanProcessesAfter"):
423             self.clean_processes()
424
425         super(LinuxNode, self).do_release()
426
427     def valid_connection(self, guid):
428         # TODO: Validate!
429         return True
430
431     def clean_processes(self):
432         self.info("Cleaning up processes")
433
434         if self.localhost:
435             return 
436         
437         if self.get("username") != 'root':
438             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
439                    "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
440                    "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
441         else:
442             if self.state >= ResourceState.READY:
443                 ########################
444                 #Collect all process (must change for a more intelligent way)
445                 ppid = []
446                 pids = []
447                 avoid_pids = "ps axjf | awk '{print $1,$2}'"
448                 (out, err), proc = self.execute(avoid_pids)
449                 if len(out) != 0:
450                     for line in out.strip().split("\n"):
451                         parts = line.strip().split(" ")
452                         ppid.append(parts[0])
453                         pids.append(parts[1])
454
455                 #Collect all process below ssh -D
456                 tree_owner = 0
457                 ssh_pids = []
458                 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
459                 (out, err), proc = self.execute(sshs)
460                 if len(out) != 0:
461                     for line in out.strip().split("\n"):
462                         parts = line.strip().split(" ")
463                         if parts[1].startswith('root@pts'):
464                             ssh_pids.append(parts[0])
465                         elif parts[1] == "-D":
466                             tree_owner = parts[0]
467
468                 avoid_kill = []
469                 temp = []
470                 #Search for the child process of the pid's collected at the first block.
471                 for process in ssh_pids:
472                     temp = self.search_for_child(process, pids, ppid)
473                     avoid_kill = list(set(temp))
474                 
475                 if len(avoid_kill) > 0:
476                     avoid_kill.append(tree_owner) 
477                 ########################
478
479                 import pickle
480                 with open("/tmp/save.proc", "rb") as pickle_file:
481                     pids = pickle.load(pickle_file)
482                 pids_temp = dict()
483                 ps_aux = "ps aux | awk '{print $2,$11}'"
484                 (out, err), proc = self.execute(ps_aux)
485                 if len(out) != 0:
486                     for line in out.strip().split("\n"):
487                         parts = line.strip().split(" ")
488                         pids_temp[parts[0]] = parts[1]
489                     # creates the difference between the machine pids freezed (pickle) and the actual
490                     # adding the avoided pids filtered above (avoid_kill) to allow users keep process
491                     # alive when using besides ssh connections  
492                     kill_pids = set(pids_temp.items()) - set(pids.items())
493                     kill_pids = ' '.join(dict(kill_pids).keys())
494
495                     # removing pids from beside connections and its process
496                     kill_pids = kill_pids.split(' ')
497                     kill_pids = list(set(kill_pids) - set(avoid_kill))
498                     kill_pids = ' '.join(kill_pids)
499
500                     cmd = ("killall tcpdump || /bin/true ; " +
501                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
502                            "kill {} || /bin/true ; ".format(kill_pids))
503                 else:
504                     cmd = ("killall tcpdump || /bin/true ; " +
505                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
506             else:
507                 cmd = ("killall tcpdump || /bin/true ; " +
508                        "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
509
510         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
511
512     def search_for_child(self, pid, pids, ppid, family=[]):
513         """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
514         """
515         family.append(pid)
516         for key, value in enumerate(ppid):
517             if value == pid:
518                 child = pids[key]
519                 self.search_for_child(child, pids, ppid)
520         return family
521         
522     def clean_home(self):
523         """ Cleans all NEPI related folders in the Linux host
524         """
525         self.info("Cleaning up home")
526         
527         cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
528               .format(self.home_dir)
529
530         return self.execute(cmd, with_lock = True)
531
532     def clean_experiment(self):
533         """ Cleans all experiment related files in the Linux host.
534         It preserves NEPI files and folders that have a multi experiment
535         scope.
536         """
537         self.info("Cleaning up experiment files")
538         
539         cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
540               .format(self.exp_dir, self.ec.exp_id)
541         
542         return self.execute(cmd, with_lock = True)
543
544     def execute(self, command,
545                 sudo = False,
546                 env = None,
547                 tty = False,
548                 forward_x11 = False,
549                 retry = 3,
550                 connect_timeout = 30,
551                 strict_host_checking = False,
552                 persistent = True,
553                 blocking = True,
554                 with_lock = False
555     ):
556         """ Notice that this invocation will block until the
557         execution finishes. If this is not the desired behavior,
558         use 'run' instead."""
559
560         if self.localhost:
561             (out, err), proc = execfuncs.lexec(
562                 command, 
563                 user = self.get("username"), # still problem with localhost
564                 sudo = sudo,
565                 env = env)
566         else:
567             if with_lock:
568                 # If the execute command is blocking, we don't want to keep
569                 # the node lock. This lock is used to avoid race conditions
570                 # when creating the ControlMaster sockets. A more elegant
571                 # solution is needed.
572                 with self._node_lock:
573                     (out, err), proc = sshfuncs.rexec(
574                         command, 
575                         host = self.get("hostname"),
576                         user = self.get("username"),
577                         port = self.get("port"),
578                         gwuser = self.get("gatewayUser"),
579                         gw = self.get("gateway"),
580                         agent = True,
581                         sudo = sudo,
582                         identity = self.get("identity"),
583                         server_key = self.get("serverKey"),
584                         env = env,
585                         tty = tty,
586                         forward_x11 = forward_x11,
587                         retry = retry,
588                         connect_timeout = connect_timeout,
589                         persistent = persistent,
590                         blocking = blocking, 
591                         strict_host_checking = strict_host_checking
592                     )
593             else:
594                 (out, err), proc = sshfuncs.rexec(
595                     command, 
596                     host = self.get("hostname"),
597                     user = self.get("username"),
598                     port = self.get("port"),
599                     gwuser = self.get("gatewayUser"),
600                     gw = self.get("gateway"),
601                     agent = True,
602                     sudo = sudo,
603                     identity = self.get("identity"),
604                     server_key = self.get("serverKey"),
605                     env = env,
606                     tty = tty,
607                     forward_x11 = forward_x11,
608                     retry = retry,
609                     connect_timeout = connect_timeout,
610                     persistent = persistent,
611                     blocking = blocking, 
612                     strict_host_checking = strict_host_checking
613                 )
614
615         return (out, err), proc
616
617     def run(self, command, home,
618             create_home = False,
619             pidfile = 'pidfile',
620             stdin = None, 
621             stdout = 'stdout', 
622             stderr = 'stderr', 
623             sudo = False,
624             tty = False,
625             strict_host_checking = False):
626         
627         self.debug("Running command '{}'".format(command))
628         
629         if self.localhost:
630             (out, err), proc = execfuncs.lspawn(
631                 command, pidfile,
632                 home = home, 
633                 create_home = create_home, 
634                 stdin = stdin or '/dev/null',
635                 stdout = stdout or '/dev/null',
636                 stderr = stderr or '/dev/null',
637                 sudo = sudo) 
638         else:
639             with self._node_lock:
640                 (out, err), proc = sshfuncs.rspawn(
641                     command,
642                     pidfile = pidfile,
643                     home = home,
644                     create_home = create_home,
645                     stdin = stdin or '/dev/null',
646                     stdout = stdout or '/dev/null',
647                     stderr = stderr or '/dev/null',
648                     sudo = sudo,
649                     host = self.get("hostname"),
650                     user = self.get("username"),
651                     port = self.get("port"),
652                     gwuser = self.get("gatewayUser"),
653                     gw = self.get("gateway"),
654                     agent = True,
655                     identity = self.get("identity"),
656                     server_key = self.get("serverKey"),
657                     tty = tty,
658                     strict_host_checking = strict_host_checking
659                 )
660
661         return (out, err), proc
662
663     def getpid(self, home, pidfile = "pidfile"):
664         if self.localhost:
665             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
666         else:
667             with self._node_lock:
668                 pidtuple = sshfuncs.rgetpid(
669                     os.path.join(home, pidfile),
670                     host = self.get("hostname"),
671                     user = self.get("username"),
672                     port = self.get("port"),
673                     gwuser = self.get("gatewayUser"),
674                     gw = self.get("gateway"),
675                     agent = True,
676                     identity = self.get("identity"),
677                     server_key = self.get("serverKey"),
678                     strict_host_checking = False
679                 )
680         
681         return pidtuple
682
683     def status(self, pid, ppid):
684         if self.localhost:
685             status = execfuncs.lstatus(pid, ppid)
686         else:
687             with self._node_lock:
688                 status = sshfuncs.rstatus(
689                     pid, ppid,
690                     host = self.get("hostname"),
691                     user = self.get("username"),
692                     port = self.get("port"),
693                     gwuser = self.get("gatewayUser"),
694                     gw = self.get("gateway"),
695                     agent = True,
696                     identity = self.get("identity"),
697                     server_key = self.get("serverKey"),
698                     strict_host_checking = False
699                 )
700            
701         return status
702     
703     def kill(self, pid, ppid, sudo = False):
704         out = err = ""
705         proc = None
706         status = self.status(pid, ppid)
707
708         if status == sshfuncs.ProcStatus.RUNNING:
709             if self.localhost:
710                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
711             else:
712                 with self._node_lock:
713                     (out, err), proc = sshfuncs.rkill(
714                         pid, ppid,
715                         host = self.get("hostname"),
716                         user = self.get("username"),
717                         port = self.get("port"),
718                         gwuser = self.get("gatewayUser"),
719                         gw = self.get("gateway"),
720                         agent = True,
721                         sudo = sudo,
722                         identity = self.get("identity"),
723                         server_key = self.get("serverKey"),
724                         strict_host_checking = False
725                     )
726
727         return (out, err), proc
728
729     def copy(self, src, dst):
730         if self.localhost:
731             (out, err), proc = execfuncs.lcopy(
732                 src, dst, 
733                 recursive = True)
734         else:
735             with self._node_lock:
736                 (out, err), proc = sshfuncs.rcopy(
737                     src, dst, 
738                     port = self.get("port"),
739                     gwuser = self.get("gatewayUser"),
740                     gw = self.get("gateway"),
741                     identity = self.get("identity"),
742                     server_key = self.get("serverKey"),
743                     recursive = True,
744                     strict_host_checking = False)
745
746         return (out, err), proc
747
748     def upload(self, src, dst, text = False, overwrite = True,
749                raise_on_error = True):
750         """ Copy content to destination
751
752         src  string with the content to copy. Can be:
753             - plain text
754             - a string with the path to a local file
755             - a string with a semi-colon separeted list of local files
756             - a string with a local directory
757
758         dst  string with destination path on the remote host (remote is 
759             always self.host)
760
761         text src is text input, it must be stored into a temp file before 
762         uploading
763         """
764         # If source is a string input 
765         f = None
766         if text and not os.path.isfile(src):
767             # src is text input that should be uploaded as file
768             # create a temporal file with the content to upload
769             f = tempfile.NamedTemporaryFile(delete=False)
770             f.write(src)
771             f.close()
772             src = f.name
773
774         # If dst files should not be overwritten, check that the files do not
775         # exits already
776         if isinstance(src, str):
777             src = map(str.strip, src.split(";"))
778     
779         if overwrite == False:
780             src = self.filter_existing_files(src, dst)
781             if not src:
782                 return ("", ""), None
783
784         if not self.localhost:
785             # Build destination as <user>@<server>:<path>
786             dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
787
788         ((out, err), proc) = self.copy(src, dst)
789
790         # clean up temp file
791         if f:
792             os.remove(f.name)
793
794         if err:
795             msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
796             self.error(msg, out, err)
797             
798             msg = "{} out: {} err: {}".format(msg, out, err)
799             if raise_on_error:
800                 raise RuntimeError(msg)
801
802         return ((out, err), proc)
803
804     def download(self, src, dst, raise_on_error = True):
805         if not self.localhost:
806             # Build destination as <user>@<server>:<path>
807             src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
808
809         ((out, err), proc) = self.copy(src, dst)
810
811         if err:
812             msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst) 
813             self.error(msg, out, err)
814
815             if raise_on_error:
816                 raise RuntimeError(msg)
817
818         return ((out, err), proc)
819
820     def install_packages_command(self, packages):
821         command = ""
822         if self.use_rpm:
823             command = rpmfuncs.install_packages_command(self.os, packages)
824         elif self.use_deb:
825             command = debfuncs.install_packages_command(self.os, packages)
826         else:
827             msg = "Error installing packages ( OS not known ) "
828             self.error(msg, self.os)
829             raise RuntimeError(msg)
830
831         return command
832
833     def install_packages(self, packages, home,
834                          run_home = None,
835                          raise_on_error = True):
836         """ Install packages in the Linux host.
837
838         'home' is the directory to upload the package installation script.
839         'run_home' is the directory from where to execute the script.
840         """
841         command = self.install_packages_command(packages)
842
843         run_home = run_home or home
844
845         (out, err), proc = self.run_and_wait(command, run_home, 
846                                              shfile = os.path.join(home, "instpkg.sh"),
847                                              pidfile = "instpkg_pidfile",
848                                              ecodefile = "instpkg_exitcode",
849                                              stdout = "instpkg_stdout", 
850                                              stderr = "instpkg_stderr",
851                                              overwrite = False,
852                                              raise_on_error = raise_on_error)
853
854         return (out, err), proc 
855
856     def remove_packages(self, packages, home, run_home = None,
857                         raise_on_error = True):
858         """ Uninstall packages from the Linux host.
859
860         'home' is the directory to upload the package un-installation script.
861         'run_home' is the directory from where to execute the script.
862         """
863         if self.use_rpm:
864             command = rpmfuncs.remove_packages_command(self.os, packages)
865         elif self.use_deb:
866             command = debfuncs.remove_packages_command(self.os, packages)
867         else:
868             msg = "Error removing packages ( OS not known ) "
869             self.error(msg)
870             raise RuntimeError(msg)
871
872         run_home = run_home or home
873
874         (out, err), proc = self.run_and_wait(command, run_home, 
875                                              shfile = os.path.join(home, "rmpkg.sh"),
876                                              pidfile = "rmpkg_pidfile",
877                                              ecodefile = "rmpkg_exitcode",
878                                              stdout = "rmpkg_stdout", 
879                                              stderr = "rmpkg_stderr",
880                                              overwrite = False,
881                                              raise_on_error = raise_on_error)
882         
883         return (out, err), proc 
884
885     def mkdir(self, paths, clean = False):
886         """ Paths is either a single remote directory path to create,
887         or a list of directories to create.
888         """
889         if clean:
890             self.rmdir(paths)
891
892         if isinstance(paths, str):
893             paths = [paths]
894
895         cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
896
897         return self.execute(cmd, with_lock = True)
898
899     def rmdir(self, paths):
900         """ Paths is either a single remote directory path to delete,
901         or a list of directories to delete.
902         """
903
904         if isinstance(paths, str):
905             paths = [paths]
906
907         cmd = " ; ".join(map(lambda path: "rm -rf {}".format(path), paths))
908
909         return self.execute(cmd, with_lock = True)
910     
911     def run_and_wait(self, command, home, 
912                      shfile="cmd.sh",
913                      env=None,
914                      overwrite=True,
915                      wait_run=True,
916                      pidfile="pidfile", 
917                      ecodefile="exitcode", 
918                      stdin=None, 
919                      stdout="stdout", 
920                      stderr="stderr", 
921                      sudo=False,
922                      tty=False,
923                      raise_on_error=True):
924         """
925         Uploads the 'command' to a bash script in the host.
926         Then runs the script detached in background in the host, and
927         busy-waites until the script finishes executing.
928         """
929
930         if not shfile.startswith("/"):
931             shfile = os.path.join(home, shfile)
932
933         self.upload_command(command, 
934                             shfile = shfile, 
935                             ecodefile = ecodefile, 
936                             env = env,
937                             overwrite = overwrite)
938
939         command = "bash {}".format(shfile)
940         # run command in background in remote host
941         (out, err), proc = self.run(command, home, 
942                                     pidfile = pidfile,
943                                     stdin = stdin, 
944                                     stdout = stdout, 
945                                     stderr = stderr, 
946                                     sudo = sudo,
947                                     tty = tty)
948
949         # check no errors occurred
950         if proc.poll():
951             msg = " Failed to run command '{}' ".format(command)
952             self.error(msg, out, err)
953             if raise_on_error:
954                 raise RuntimeError(msg)
955
956         # Wait for pid file to be generated
957         pid, ppid = self.wait_pid(
958             home = home, 
959             pidfile = pidfile, 
960             raise_on_error = raise_on_error)
961
962         if wait_run:
963             # wait until command finishes to execute
964             self.wait_run(pid, ppid)
965             
966             (eout, err), proc = self.check_errors(home,
967                                                   ecodefile = ecodefile,
968                                                   stderr = stderr)
969
970             # Out is what was written in the stderr file
971             if err:
972                 msg = " Failed to run command '{}' ".format(command)
973                 self.error(msg, eout, err)
974
975                 if raise_on_error:
976                     raise RuntimeError(msg)
977
978         (out, oerr), proc = self.check_output(home, stdout)
979         
980         return (out, err), proc
981         
982     def exitcode(self, home, ecodefile = "exitcode"):
983         """
984         Get the exit code of an application.
985         Returns an integer value with the exit code 
986         """
987         (out, err), proc = self.check_output(home, ecodefile)
988
989         # Succeeded to open file, return exit code in the file
990         if proc.wait() == 0:
991             try:
992                 return int(out.strip())
993             except:
994                 # Error in the content of the file!
995                 return ExitCode.CORRUPTFILE
996
997         # No such file or directory
998         if proc.returncode == 1:
999             return ExitCode.FILENOTFOUND
1000         
1001         # Other error from 'cat'
1002         return ExitCode.ERROR
1003
1004     def upload_command(self, command, 
1005                        shfile="cmd.sh",
1006                        ecodefile="exitcode",
1007                        overwrite=True,
1008                        env=None):
1009         """ Saves the command as a bash script file in the remote host, and
1010         forces to save the exit code of the command execution to the ecodefile
1011         """
1012
1013         if not (command.strip().endswith(";") or command.strip().endswith("&")):
1014             command += ";"
1015             
1016         # The exit code of the command will be stored in ecodefile
1017         command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1018                   .format(command=command, ecodefile=ecodefile)
1019
1020         # Export environment
1021         environ = self.format_environment(env)
1022
1023         # Add environ to command
1024         command = environ + command
1025
1026         return self.upload(command, shfile, text=True, overwrite=overwrite)
1027
1028     def format_environment(self, env, inline=False):
1029         """ Formats the environment variables for a command to be executed
1030         either as an inline command
1031         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
1032         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1033         """
1034         if not env: return ""
1035
1036         # Remove extra white spaces
1037         env = re.sub(r'\s+', ' ', env.strip())
1038
1039         sep = ";" if inline else "\n"
1040         return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep 
1041
1042     def check_errors(self, home, 
1043                      ecodefile = "exitcode", 
1044                      stderr = "stderr"):
1045         """ Checks whether errors occurred while running a command.
1046         It first checks the exit code for the command, and only if the
1047         exit code is an error one it returns the error output.
1048
1049         """
1050         proc = None
1051         err = ""
1052
1053         # get exit code saved in the 'exitcode' file
1054         ecode = self.exitcode(home, ecodefile)
1055
1056         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1057             err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1058         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1059             # The process returned an error code or didn't exist. 
1060             # Check standard error.
1061             (err, eerr), proc = self.check_output(home, stderr)
1062
1063             # If the stderr file was not found, assume nothing bad happened,
1064             # and just ignore the error.
1065             # (cat returns 1 for error "No such file or directory")
1066             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
1067                 err = "" 
1068                 
1069         return ("", err), proc
1070     
1071     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1072         """ Waits until the pid file for the command is generated, 
1073             and returns the pid and ppid of the process """
1074         pid = ppid = None
1075         delay = 1.0
1076
1077         for i in xrange(2):
1078             pidtuple = self.getpid(home = home, pidfile = pidfile)
1079             
1080             if pidtuple:
1081                 pid, ppid = pidtuple
1082                 break
1083             else:
1084                 time.sleep(delay)
1085                 delay = delay * 1.5
1086         else:
1087             msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1088             self.error(msg)
1089     
1090             if raise_on_error:
1091                 raise RuntimeError(msg)
1092
1093         return pid, ppid
1094
1095     def wait_run(self, pid, ppid, trial = 0):
1096         """ wait for a remote process to finish execution """
1097         delay = 1.0
1098
1099         while True:
1100             status = self.status(pid, ppid)
1101             
1102             if status is ProcStatus.FINISHED:
1103                 break
1104             elif status is not ProcStatus.RUNNING:
1105                 delay = delay * 1.5
1106                 time.sleep(delay)
1107                 # If it takes more than 20 seconds to start, then
1108                 # asume something went wrong
1109                 if delay > 20:
1110                     break
1111             else:
1112                 # The app is running, just wait...
1113                 time.sleep(0.5)
1114
1115     def check_output(self, home, filename):
1116         """ Retrives content of file """
1117         (out, err), proc = self.execute(
1118             "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1119         return (out, err), proc
1120
1121     def is_alive(self):
1122         """ Checks if host is responsive
1123         """
1124         if self.localhost:
1125             return True
1126
1127         out = err = ""
1128         msg = "Unresponsive host. Wrong answer. "
1129
1130         # The underlying SSH layer will sometimes return an empty
1131         # output (even if the command was executed without errors).
1132         # To work arround this, repeat the operation N times or
1133         # until the result is not empty string
1134         try:
1135             (out, err), proc = self.execute("echo 'ALIVE'",
1136                                             blocking = True,
1137                                             with_lock = True)
1138             
1139             if out.find("ALIVE") > -1:
1140                 return True
1141         except:
1142             trace = traceback.format_exc()
1143             msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1144
1145         self.error(msg, out, err)
1146         return False
1147
1148     def find_home(self):
1149         """ 
1150         Retrieves host home directory
1151         """
1152         # The underlying SSH layer will sometimes return an empty
1153         # output (even if the command was executed without errors).
1154         # To work arround this, repeat the operation N times or
1155         # until the result is not empty string
1156         msg = "Impossible to retrieve HOME directory"
1157         try:
1158             (out, err), proc = self.execute("echo ${HOME}",
1159                                             blocking = True,
1160                                             with_lock = True)
1161             
1162             if out.strip() != "":
1163                 self._home_dir =  out.strip()
1164         except:
1165             trace = traceback.format_exc()
1166             msg = "Impossible to retrieve HOME directory {}".format(trace)
1167
1168         if not self._home_dir:
1169             self.error(msg)
1170             raise RuntimeError(msg)
1171
1172     def filter_existing_files(self, src, dst):
1173         """ Removes files that already exist in the Linux host from src list
1174         """
1175         # construct a dictionary with { dst: src }
1176         dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1177                 if len(src) > 1 else {dst: src[0]}
1178
1179         command = []
1180         for d in dests.keys():
1181             command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1182
1183         command = ";".join(command)
1184
1185         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1186         
1187         for d in dests.keys():
1188             if out.find(d) > -1:
1189                 del dests[d]
1190
1191         if not dests:
1192             return []
1193
1194         return dests.values()
1195