still making both branches closer
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy, 
21                                      ResourceState)
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33 import traceback
34
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!! 
37
38 class ExitCode:
39     """
40     Error codes that the rexitcode function can return if unable to
41     check the exit code of a spawned process
42     """
43     FILENOTFOUND = -1
44     CORRUPTFILE = -2
45     ERROR = -3
46     OK = 0
47
48 class OSType:
49     """
50     Supported flavors of Linux OS
51     """
52     DEBIAN = 1 
53     UBUNTU = 1 << 1 
54     FEDORA = 1 << 2
55     FEDORA_8 = 1 << 3 | FEDORA 
56     FEDORA_12 = 1 << 4 | FEDORA 
57     FEDORA_14 = 1 << 5 | FEDORA 
58
59 @clsinit_copy
60 class LinuxNode(ResourceManager):
61     """
62     .. class:: Class Args :
63       
64         :param ec: The Experiment controller
65         :type ec: ExperimentController
66         :param guid: guid of the RM
67         :type guid: int
68
69     .. note::
70
71         There are different ways in which commands can be executed using the
72         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
73         'run_and_wait'). 
74         
75         Brief explanation:
76
77             * 'execute' (blocking mode) :  
78
79                      HOW IT WORKS: 'execute', forks a process and run the
80                      command, synchronously, attached to the terminal, in
81                      foreground.
82                      The execute method will block until the command returns
83                      the result on 'out', 'err' (so until it finishes executing).
84   
85                      USAGE: short-lived commands that must be executed attached
86                      to a terminal and in foreground, for which it IS necessary
87                      to block until the command has finished (e.g. if you want
88                      to run 'ls' or 'cat').
89
90             * 'execute' (NON blocking mode - blocking = False) :
91
92                     HOW IT WORKS: Same as before, except that execute method
93                     will return immediately (even if command still running).
94
95                     USAGE: long-lived commands that must be executed attached
96                     to a terminal and in foreground, but for which it is not
97                     necessary to block until the command has finished. (e.g.
98                     start an application using X11 forwarding)
99
100              * 'run' :
101
102                    HOW IT WORKS: Connects to the host ( using SSH if remote)
103                    and launches the command in background, detached from any
104                    terminal (daemonized), and returns. The command continues to
105                    run remotely, but since it is detached from the terminal,
106                    its pipes (stdin, stdout, stderr) can't be redirected to the
107                    console (as normal non detached processes would), and so they
108                    are explicitly redirected to files. The pidfile is created as
109                    part of the process of launching the command. The pidfile
110                    holds the pid and ppid of the process forked in background,
111                    so later on it is possible to check whether the command is still
112                    running.
113
114                     USAGE: long-lived commands that can run detached in background,
115                     for which it is NOT necessary to block (wait) until the command
116                     has finished. (e.g. start an application that is not using X11
117                     forwarding. It can run detached and remotely in background)
118
119              * 'run_and_wait' :
120
121                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122                     the command has finished execution. It also checks whether
123                     errors occurred during runtime by reading the exitcode file,
124                     which contains the exit code of the command that was run
125                     (checking stderr only is not always reliable since many
126                     commands throw debugging info to stderr and the only way to
127                     automatically know whether an error really happened is to
128                     check the process exit code).
129
130                     Another difference with respect to 'run', is that instead
131                     of directly executing the command as a bash command line,
132                     it uploads the command to a bash script and runs the script.
133                     This allows to use the bash script to debug errors, since
134                     it remains at the remote host and can be run manually to
135                     reproduce the error.
136                   
137                     USAGE: medium-lived commands that can run detached in
138                     background, for which it IS necessary to block (wait) until
139                     the command has finished. (e.g. Package installation,
140                     source compilation, file download, etc)
141
142     """
143     _rtype = "linux::Node"
144     _help = "Controls Linux host machines ( either localhost or a host " \
145             "that can be accessed using a SSH key)"
146     _platform = "linux"
147
148     @classmethod
149     def _register_attributes(cls):
150         cls._register_attribute(Attribute(
151             "hostname", "Hostname of the machine",
152             flags = Flags.Design))
153
154         cls._register_attribute(Attribute(
155             "username", "Local account username", 
156             flags = Flags.Credential))
157
158         cls._register_attribute(Attribute(
159             "port", "SSH port",
160             flags = Flags.Design))
161         
162         cls._register_attribute(Attribute(
163             "home",
164             "Experiment home directory to store all experiment related files",
165             flags = Flags.Design))
166         
167         cls._register_attribute(Attribute(
168             "identity", "SSH identity file",
169             flags = Flags.Credential))
170         
171         cls._register_attribute(Attribute(
172             "serverKey", "Server public key", 
173             flags = Flags.Design))
174         
175         cls._register_attribute(Attribute(
176             "cleanHome",
177             "Remove all nepi files and directories "
178             " from node home folder before starting experiment", 
179             type = Types.Bool,
180             default = False,
181             flags = Flags.Design))
182
183         cls._register_attribute(Attribute(
184             "cleanExperiment", "Remove all files and directories " 
185             " from a previous same experiment, before the new experiment starts", 
186             type = Types.Bool,
187             default = False,
188             flags = Flags.Design))
189         
190         cls._register_attribute(Attribute(
191             "cleanProcesses", 
192             "Kill all running processes before starting experiment",
193             type = Types.Bool,
194             default = False,
195             flags = Flags.Design))
196         
197         cls._register_attribute(Attribute(
198             "cleanProcessesAfter", 
199             """Kill all running processes after starting experiment
200             This might be dangerous when using user root""",
201             type = Types.Bool,
202             default = True,
203             flags = Flags.Design))
204         
205         cls._register_attribute(Attribute(
206             "tearDown",
207             "Bash script to be executed before releasing the resource",
208             flags = Flags.Design))
209
210         cls._register_attribute(Attribute(
211             "gatewayUser",
212             "Gateway account username",
213             flags = Flags.Design))
214
215         cls._register_attribute(Attribute(
216             "gateway",
217             "Hostname of the gateway machine",
218             flags = Flags.Design))
219
220         cls._register_attribute(Attribute(
221             "ip",
222             "Linux host public IP address. "
223             "Must not be modified by the user unless hostname is 'localhost'",
224             flags = Flags.Design))
225
226     def __init__(self, ec, guid):
227         super(LinuxNode, self).__init__(ec, guid)
228         self._os = None
229         # home directory at Linux host
230         self._home_dir = ""
231
232         # lock to prevent concurrent applications on the same node,
233         # to execute commands at the same time. There are potential
234         # concurrency issues when using SSH to a same host from 
235         # multiple threads. There are also possible operational 
236         # issues, e.g. an application querying the existence 
237         # of a file or folder prior to its creation, and another 
238         # application creating the same file or folder in between.
239         self._node_lock = threading.Lock()
240         
241     def log_message(self, msg):
242         return " guid {} - host {} - {} "\
243             .format(self.guid, self.get("hostname"), msg)
244
245     @property
246     def home_dir(self):
247         home = self.get("home") or ""
248         if not home.startswith("/"):
249             home = os.path.join(self._home_dir, home) 
250             return home
251
252     @property
253     def nepi_home(self):
254         return os.path.join(self.home_dir, ".nepi")
255
256     @property
257     def usr_dir(self):
258         return os.path.join(self.nepi_home, "nepi-usr")
259
260     @property
261     def lib_dir(self):
262         return os.path.join(self.usr_dir, "lib")
263
264     @property
265     def bin_dir(self):
266         return os.path.join(self.usr_dir, "bin")
267
268     @property
269     def src_dir(self):
270         return os.path.join(self.usr_dir, "src")
271
272     @property
273     def share_dir(self):
274         return os.path.join(self.usr_dir, "share")
275
276     @property
277     def exp_dir(self):
278         return os.path.join(self.nepi_home, "nepi-exp")
279
280     @property
281     def exp_home(self):
282         return os.path.join(self.exp_dir, self.ec.exp_id)
283
284     @property
285     def node_home(self):
286         return os.path.join(self.exp_home, "node-{}".format(self.guid))
287
288     @property
289     def run_home(self):
290         return os.path.join(self.node_home, self.ec.run_id)
291
292     @property
293     def os(self):
294         if self._os:
295             return self._os
296
297         if not self.localhost and not self.get("username"):
298             msg = "Can't resolve OS, insufficient data "
299             self.error(msg)
300             raise RuntimeError(msg)
301
302         out = self.get_os()
303
304         if out.find("Debian") == 0: 
305             self._os = OSType.DEBIAN
306         elif out.find("Ubuntu") ==0:
307             self._os = OSType.UBUNTU
308         elif out.find("Fedora release") == 0:
309             self._os = OSType.FEDORA
310             if out.find("Fedora release 8") == 0:
311                 self._os = OSType.FEDORA_8
312             elif out.find("Fedora release 12") == 0:
313                 self._os = OSType.FEDORA_12
314             elif out.find("Fedora release 14") == 0:
315                 self._os = OSType.FEDORA_14
316         else:
317             msg = "Unsupported OS"
318             self.error(msg, out)
319             raise RuntimeError("{} - {} ".format(msg, out))
320
321         return self._os
322
323     def get_os(self):
324         # The underlying SSH layer will sometimes return an empty
325         # output (even if the command was executed without errors).
326         # To work arround this, repeat the operation N times or
327         # until the result is not empty string
328         out = ""
329         try:
330             (out, err), proc = self.execute("cat /etc/issue", 
331                                             with_lock = True,
332                                             blocking = True)
333         except:
334             trace = traceback.format_exc()
335             msg = "Error detecting OS: {} ".format(trace)
336             self.error(msg, out, err)
337     
338         return out
339
340     @property
341     def use_deb(self):
342         return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
343
344     @property
345     def use_rpm(self):
346         return (self.os & OSType.FEDORA)
347
348     @property
349     def localhost(self):
350         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
351
352     def do_provision(self):
353         # check if host is alive
354         if not self.is_alive():
355             trace = traceback.format_exc()
356             msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
357             self.error(msg)
358             raise RuntimeError(msg)
359
360         self.find_home()
361
362         if self.get("cleanProcesses"):
363             self.clean_processes()
364
365         if self.get("cleanHome"):
366             self.clean_home()
367             
368         if self.get("cleanExperiment"):
369             self.clean_experiment()
370             
371         # Create shared directory structure and node home directory
372         paths = [self.lib_dir, 
373                  self.bin_dir, 
374                  self.src_dir, 
375                  self.share_dir, 
376                  self.node_home]
377
378         self.mkdir(paths)
379
380         # Get Public IP address if possible
381         if not self.get("ip"):
382             try:
383                 ip = sshfuncs.gethostbyname(self.get("hostname"))
384                 self.set("ip", ip)
385             except:
386                 if self.get("gateway") is None:
387                     msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
388                     self.error(msg)
389
390         super(LinuxNode, self).do_provision()
391
392     def do_deploy(self):
393         if self.state == ResourceState.NEW:
394             self.info("Deploying node")
395             self.do_discover()
396             self.do_provision()
397
398         # Node needs to wait until all associated interfaces are 
399         # ready before it can finalize deployment
400         from nepi.resources.linux.interface import LinuxInterface
401         ifaces = self.get_connected(LinuxInterface.get_rtype())
402         for iface in ifaces:
403             if iface.state < ResourceState.READY:
404                 self.ec.schedule(self.reschedule_delay, self.deploy)
405                 return 
406
407         super(LinuxNode, self).do_deploy()
408
409     def do_release(self):
410         rms = self.get_connected()
411         for rm in rms:
412             # Node needs to wait until all associated RMs are released
413             # before it can be released
414             if rm.state != ResourceState.RELEASED:
415                 self.ec.schedule(self.reschedule_delay, self.release)
416                 return 
417
418         tear_down = self.get("tearDown")
419         if tear_down:
420             self.execute(tear_down)
421
422         if self.get("cleanProcessesAfter"):
423             self.clean_processes()
424
425         super(LinuxNode, self).do_release()
426
427     def valid_connection(self, guid):
428         # TODO: Validate!
429         return True
430
431     def clean_processes(self):
432         self.info("Cleaning up processes")
433
434         if self.localhost:
435             return 
436         
437         if self.get("username") != 'root':
438             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
439                    "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
440                    "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
441         else:
442             if self.state >= ResourceState.READY:
443                 ########################
444                 #Collect all process (must change for a more intelligent way)
445                 ppid = []
446                 pids = []
447                 avoid_pids = "ps axjf | awk '{print $1,$2}'"
448                 (out, err), proc = self.execute(avoid_pids)
449                 if len(out) != 0:
450                     for line in out.strip().split("\n"):
451                         parts = line.strip().split(" ")
452                         ppid.append(parts[0])
453                         pids.append(parts[1])
454
455                 #Collect all process below ssh -D
456                 tree_owner = 0
457                 ssh_pids = []
458                 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
459                 (out, err), proc = self.execute(sshs)
460                 if len(out) != 0:
461                     for line in out.strip().split("\n"):
462                         parts = line.strip().split(" ")
463                         if parts[1].startswith('root@pts'):
464                             ssh_pids.append(parts[0])
465                         elif parts[1] == "-D":
466                             tree_owner = parts[0]
467
468                 avoid_kill = []
469                 temp = []
470                 #Search for the child process of the pid's collected at the first block.
471                 for process in ssh_pids:
472                     temp = self.search_for_child(process, pids, ppid)
473                     avoid_kill = list(set(temp))
474                 
475                 if len(avoid_kill) > 0:
476                     avoid_kill.append(tree_owner) 
477                 ########################
478
479                 import pickle
480                 with open("/tmp/save.proc", "rb") as pickle_file:
481                     pids = pickle.load(pickle_file)
482                 pids_temp = dict()
483                 ps_aux = "ps aux | awk '{print $2,$11}'"
484                 (out, err), proc = self.execute(ps_aux)
485                 if len(out) != 0:
486                     for line in out.strip().split("\n"):
487                         parts = line.strip().split(" ")
488                         pids_temp[parts[0]] = parts[1]
489                     # creates the difference between the machine pids freezed (pickle) and the actual
490                     # adding the avoided pids filtered above (avoid_kill) to allow users keep process
491                     # alive when using besides ssh connections  
492                     kill_pids = set(pids_temp.items()) - set(pids.items())
493                     # py2/py3 : keep it simple
494                     kill_pids = ' '.join(kill_pids)
495
496                     # removing pids from beside connections and its process
497                     kill_pids = kill_pids.split(' ')
498                     kill_pids = list(set(kill_pids) - set(avoid_kill))
499                     kill_pids = ' '.join(kill_pids)
500
501                     cmd = ("killall tcpdump || /bin/true ; " +
502                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
503                            "kill {} || /bin/true ; ".format(kill_pids))
504                 else:
505                     cmd = ("killall tcpdump || /bin/true ; " +
506                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
507             else:
508                 cmd = ("killall tcpdump || /bin/true ; " +
509                        "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
510
511         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
512
513     def search_for_child(self, pid, pids, ppid, family=[]):
514         """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
515         """
516         family.append(pid)
517         for key, value in enumerate(ppid):
518             if value == pid:
519                 child = pids[key]
520                 self.search_for_child(child, pids, ppid)
521         return family
522         
523     def clean_home(self):
524         """ Cleans all NEPI related folders in the Linux host
525         """
526         self.info("Cleaning up home")
527         
528         cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
529               .format(self.home_dir)
530
531         return self.execute(cmd, with_lock = True)
532
533     def clean_experiment(self):
534         """ Cleans all experiment related files in the Linux host.
535         It preserves NEPI files and folders that have a multi experiment
536         scope.
537         """
538         self.info("Cleaning up experiment files")
539         
540         cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
541               .format(self.exp_dir, self.ec.exp_id)
542         
543         return self.execute(cmd, with_lock = True)
544
545     def execute(self, command,
546                 sudo = False,
547                 env = None,
548                 tty = False,
549                 forward_x11 = False,
550                 retry = 3,
551                 connect_timeout = 30,
552                 strict_host_checking = False,
553                 persistent = True,
554                 blocking = True,
555                 with_lock = False
556     ):
557         """ Notice that this invocation will block until the
558         execution finishes. If this is not the desired behavior,
559         use 'run' instead."""
560
561         if self.localhost:
562             (out, err), proc = execfuncs.lexec(
563                 command, 
564                 user = self.get("username"), # still problem with localhost
565                 sudo = sudo,
566                 env = env)
567         else:
568             if with_lock:
569                 # If the execute command is blocking, we don't want to keep
570                 # the node lock. This lock is used to avoid race conditions
571                 # when creating the ControlMaster sockets. A more elegant
572                 # solution is needed.
573                 with self._node_lock:
574                     (out, err), proc = sshfuncs.rexec(
575                         command, 
576                         host = self.get("hostname"),
577                         user = self.get("username"),
578                         port = self.get("port"),
579                         gwuser = self.get("gatewayUser"),
580                         gw = self.get("gateway"),
581                         agent = True,
582                         sudo = sudo,
583                         identity = self.get("identity"),
584                         server_key = self.get("serverKey"),
585                         env = env,
586                         tty = tty,
587                         forward_x11 = forward_x11,
588                         retry = retry,
589                         connect_timeout = connect_timeout,
590                         persistent = persistent,
591                         blocking = blocking, 
592                         strict_host_checking = strict_host_checking
593                     )
594             else:
595                 (out, err), proc = sshfuncs.rexec(
596                     command, 
597                     host = self.get("hostname"),
598                     user = self.get("username"),
599                     port = self.get("port"),
600                     gwuser = self.get("gatewayUser"),
601                     gw = self.get("gateway"),
602                     agent = True,
603                     sudo = sudo,
604                     identity = self.get("identity"),
605                     server_key = self.get("serverKey"),
606                     env = env,
607                     tty = tty,
608                     forward_x11 = forward_x11,
609                     retry = retry,
610                     connect_timeout = connect_timeout,
611                     persistent = persistent,
612                     blocking = blocking, 
613                     strict_host_checking = strict_host_checking
614                 )
615
616         return (out, err), proc
617
618     def run(self, command, home,
619             create_home = False,
620             pidfile = 'pidfile',
621             stdin = None, 
622             stdout = 'stdout', 
623             stderr = 'stderr', 
624             sudo = False,
625             tty = False,
626             strict_host_checking = False):
627         
628         self.debug("Running command '{}'".format(command))
629         
630         if self.localhost:
631             (out, err), proc = execfuncs.lspawn(
632                 command, pidfile,
633                 home = home, 
634                 create_home = create_home, 
635                 stdin = stdin or '/dev/null',
636                 stdout = stdout or '/dev/null',
637                 stderr = stderr or '/dev/null',
638                 sudo = sudo) 
639         else:
640             with self._node_lock:
641                 (out, err), proc = sshfuncs.rspawn(
642                     command,
643                     pidfile = pidfile,
644                     home = home,
645                     create_home = create_home,
646                     stdin = stdin or '/dev/null',
647                     stdout = stdout or '/dev/null',
648                     stderr = stderr or '/dev/null',
649                     sudo = sudo,
650                     host = self.get("hostname"),
651                     user = self.get("username"),
652                     port = self.get("port"),
653                     gwuser = self.get("gatewayUser"),
654                     gw = self.get("gateway"),
655                     agent = True,
656                     identity = self.get("identity"),
657                     server_key = self.get("serverKey"),
658                     tty = tty,
659                     strict_host_checking = strict_host_checking
660                 )
661
662         return (out, err), proc
663
664     def getpid(self, home, pidfile = "pidfile"):
665         if self.localhost:
666             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
667         else:
668             with self._node_lock:
669                 pidtuple = sshfuncs.rgetpid(
670                     os.path.join(home, pidfile),
671                     host = self.get("hostname"),
672                     user = self.get("username"),
673                     port = self.get("port"),
674                     gwuser = self.get("gatewayUser"),
675                     gw = self.get("gateway"),
676                     agent = True,
677                     identity = self.get("identity"),
678                     server_key = self.get("serverKey"),
679                     strict_host_checking = False
680                 )
681         
682         return pidtuple
683
684     def status(self, pid, ppid):
685         if self.localhost:
686             status = execfuncs.lstatus(pid, ppid)
687         else:
688             with self._node_lock:
689                 status = sshfuncs.rstatus(
690                     pid, ppid,
691                     host = self.get("hostname"),
692                     user = self.get("username"),
693                     port = self.get("port"),
694                     gwuser = self.get("gatewayUser"),
695                     gw = self.get("gateway"),
696                     agent = True,
697                     identity = self.get("identity"),
698                     server_key = self.get("serverKey"),
699                     strict_host_checking = False
700                 )
701            
702         return status
703     
704     def kill(self, pid, ppid, sudo = False):
705         out = err = ""
706         proc = None
707         status = self.status(pid, ppid)
708
709         if status == sshfuncs.ProcStatus.RUNNING:
710             if self.localhost:
711                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
712             else:
713                 with self._node_lock:
714                     (out, err), proc = sshfuncs.rkill(
715                         pid, ppid,
716                         host = self.get("hostname"),
717                         user = self.get("username"),
718                         port = self.get("port"),
719                         gwuser = self.get("gatewayUser"),
720                         gw = self.get("gateway"),
721                         agent = True,
722                         sudo = sudo,
723                         identity = self.get("identity"),
724                         server_key = self.get("serverKey"),
725                         strict_host_checking = False
726                     )
727
728         return (out, err), proc
729
730     def copy(self, src, dst):
731         if self.localhost:
732             (out, err), proc = execfuncs.lcopy(
733                 src, dst, 
734                 recursive = True)
735         else:
736             with self._node_lock:
737                 (out, err), proc = sshfuncs.rcopy(
738                     src, dst, 
739                     port = self.get("port"),
740                     gwuser = self.get("gatewayUser"),
741                     gw = self.get("gateway"),
742                     identity = self.get("identity"),
743                     server_key = self.get("serverKey"),
744                     recursive = True,
745                     strict_host_checking = False)
746
747         return (out, err), proc
748
749     def upload(self, src, dst, text = False, overwrite = True,
750                raise_on_error = True):
751         """ Copy content to destination
752
753         src  string with the content to copy. Can be:
754             - plain text
755             - a string with the path to a local file
756             - a string with a semi-colon separeted list of local files
757             - a string with a local directory
758
759         dst  string with destination path on the remote host (remote is 
760             always self.host)
761
762         text src is text input, it must be stored into a temp file before 
763         uploading
764         """
765         # If source is a string input 
766         f = None
767         if text and not os.path.isfile(src):
768             # src is text input that should be uploaded as file
769             # create a temporal file with the content to upload
770             # in python3 we need to open in binary mode if str is bytes
771             mode = 'w' if isinstance(src, str) else 'wb'
772             f = tempfile.NamedTemporaryFile(mode=mode, delete=False)
773             f.write(src)
774             f.close()
775             src = f.name
776
777         # If dst files should not be overwritten, check that the files do not
778         # exist already
779         if isinstance(src, str):
780             src = [s.strip() for s in src.split(";")]
781     
782         if overwrite == False:
783             src = self.filter_existing_files(src, dst)
784             if not src:
785                 return ("", ""), None
786
787         if not self.localhost:
788             # Build destination as <user>@<server>:<path>
789             dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
790
791         ((out, err), proc) = self.copy(src, dst)
792
793         # clean up temp file
794         if f:
795             os.remove(f.name)
796
797         if err:
798             msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
799             self.error(msg, out, err)
800             
801             msg = "{} out: {} err: {}".format(msg, out, err)
802             if raise_on_error:
803                 raise RuntimeError(msg)
804
805         return ((out, err), proc)
806
807     def download(self, src, dst, raise_on_error = True):
808         if not self.localhost:
809             # Build destination as <user>@<server>:<path>
810             src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
811
812         ((out, err), proc) = self.copy(src, dst)
813
814         if err:
815             msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst) 
816             self.error(msg, out, err)
817
818             if raise_on_error:
819                 raise RuntimeError(msg)
820
821         return ((out, err), proc)
822
823     def install_packages_command(self, packages):
824         command = ""
825         if self.use_rpm:
826             command = rpmfuncs.install_packages_command(self.os, packages)
827         elif self.use_deb:
828             command = debfuncs.install_packages_command(self.os, packages)
829         else:
830             msg = "Error installing packages ( OS not known ) "
831             self.error(msg, self.os)
832             raise RuntimeError(msg)
833
834         return command
835
836     def install_packages(self, packages, home,
837                          run_home = None,
838                          raise_on_error = True):
839         """ Install packages in the Linux host.
840
841         'home' is the directory to upload the package installation script.
842         'run_home' is the directory from where to execute the script.
843         """
844         command = self.install_packages_command(packages)
845
846         run_home = run_home or home
847
848         (out, err), proc = self.run_and_wait(command, run_home, 
849                                              shfile = os.path.join(home, "instpkg.sh"),
850                                              pidfile = "instpkg_pidfile",
851                                              ecodefile = "instpkg_exitcode",
852                                              stdout = "instpkg_stdout", 
853                                              stderr = "instpkg_stderr",
854                                              overwrite = False,
855                                              raise_on_error = raise_on_error)
856
857         return (out, err), proc 
858
859     def remove_packages(self, packages, home, run_home = None,
860                         raise_on_error = True):
861         """ Uninstall packages from the Linux host.
862
863         'home' is the directory to upload the package un-installation script.
864         'run_home' is the directory from where to execute the script.
865         """
866         if self.use_rpm:
867             command = rpmfuncs.remove_packages_command(self.os, packages)
868         elif self.use_deb:
869             command = debfuncs.remove_packages_command(self.os, packages)
870         else:
871             msg = "Error removing packages ( OS not known ) "
872             self.error(msg)
873             raise RuntimeError(msg)
874
875         run_home = run_home or home
876
877         (out, err), proc = self.run_and_wait(command, run_home, 
878                                              shfile = os.path.join(home, "rmpkg.sh"),
879                                              pidfile = "rmpkg_pidfile",
880                                              ecodefile = "rmpkg_exitcode",
881                                              stdout = "rmpkg_stdout", 
882                                              stderr = "rmpkg_stderr",
883                                              overwrite = False,
884                                              raise_on_error = raise_on_error)
885         
886         return (out, err), proc 
887
888     def mkdir(self, paths, clean = False):
889         """ Paths is either a single remote directory path to create,
890         or a list of directories to create.
891         """
892         if clean:
893             self.rmdir(paths)
894
895         if isinstance(paths, str):
896             paths = [paths]
897
898         cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
899
900         return self.execute(cmd, with_lock = True)
901
902     def rmdir(self, paths):
903         """ Paths is either a single remote directory path to delete,
904         or a list of directories to delete.
905         """
906
907         if isinstance(paths, str):
908             paths = [paths]
909
910         cmd = " ; ".join(["rm -rf {}".format(path) for path in paths])
911
912         return self.execute(cmd, with_lock = True)
913     
914     def run_and_wait(self, command, home, 
915                      shfile="cmd.sh",
916                      env=None,
917                      overwrite=True,
918                      wait_run=True,
919                      pidfile="pidfile", 
920                      ecodefile="exitcode", 
921                      stdin=None, 
922                      stdout="stdout", 
923                      stderr="stderr", 
924                      sudo=False,
925                      tty=False,
926                      raise_on_error=True):
927         """
928         Uploads the 'command' to a bash script in the host.
929         Then runs the script detached in background in the host, and
930         busy-waites until the script finishes executing.
931         """
932
933         if not shfile.startswith("/"):
934             shfile = os.path.join(home, shfile)
935
936         self.upload_command(command, 
937                             shfile = shfile, 
938                             ecodefile = ecodefile, 
939                             env = env,
940                             overwrite = overwrite)
941
942         command = "bash {}".format(shfile)
943         # run command in background in remote host
944         (out, err), proc = self.run(command, home, 
945                                     pidfile = pidfile,
946                                     stdin = stdin, 
947                                     stdout = stdout, 
948                                     stderr = stderr, 
949                                     sudo = sudo,
950                                     tty = tty)
951
952         # check no errors occurred
953         if proc.poll():
954             msg = " Failed to run command '{}' ".format(command)
955             self.error(msg, out, err)
956             if raise_on_error:
957                 raise RuntimeError(msg)
958
959         # Wait for pid file to be generated
960         pid, ppid = self.wait_pid(
961             home = home, 
962             pidfile = pidfile, 
963             raise_on_error = raise_on_error)
964
965         if wait_run:
966             # wait until command finishes to execute
967             self.wait_run(pid, ppid)
968             
969             (eout, err), proc = self.check_errors(home,
970                                                   ecodefile = ecodefile,
971                                                   stderr = stderr)
972
973             # Out is what was written in the stderr file
974             if err:
975                 msg = " Failed to run command '{}' ".format(command)
976                 self.error(msg, eout, err)
977
978                 if raise_on_error:
979                     raise RuntimeError(msg)
980
981         (out, oerr), proc = self.check_output(home, stdout)
982         
983         return (out, err), proc
984         
985     def exitcode(self, home, ecodefile = "exitcode"):
986         """
987         Get the exit code of an application.
988         Returns an integer value with the exit code 
989         """
990         (out, err), proc = self.check_output(home, ecodefile)
991
992         # Succeeded to open file, return exit code in the file
993         if proc.wait() == 0:
994             try:
995                 return int(out.strip())
996             except:
997                 # Error in the content of the file!
998                 return ExitCode.CORRUPTFILE
999
1000         # No such file or directory
1001         if proc.returncode == 1:
1002             return ExitCode.FILENOTFOUND
1003         
1004         # Other error from 'cat'
1005         return ExitCode.ERROR
1006
1007     def upload_command(self, command, 
1008                        shfile="cmd.sh",
1009                        ecodefile="exitcode",
1010                        overwrite=True,
1011                        env=None):
1012         """ Saves the command as a bash script file in the remote host, and
1013         forces to save the exit code of the command execution to the ecodefile
1014         """
1015
1016         if not (command.strip().endswith(";") or command.strip().endswith("&")):
1017             command += ";"
1018             
1019         # The exit code of the command will be stored in ecodefile
1020         command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1021                   .format(command=command, ecodefile=ecodefile)
1022
1023         # Export environment
1024         environ = self.format_environment(env)
1025
1026         # Add environ to command
1027         command = environ + command
1028
1029         return self.upload(command, shfile, text=True, overwrite=overwrite)
1030
1031     def format_environment(self, env, inline=False):
1032         """ Formats the environment variables for a command to be executed
1033         either as an inline command
1034         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
1035         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1036         """
1037         if not env: return ""
1038
1039         # Remove extra white spaces
1040         env = re.sub(r'\s+', ' ', env.strip())
1041
1042         sep = ";" if inline else "\n"
1043         return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep 
1044
1045     def check_errors(self, home, 
1046                      ecodefile = "exitcode", 
1047                      stderr = "stderr"):
1048         """ Checks whether errors occurred while running a command.
1049         It first checks the exit code for the command, and only if the
1050         exit code is an error one it returns the error output.
1051
1052         """
1053         proc = None
1054         err = ""
1055
1056         # get exit code saved in the 'exitcode' file
1057         ecode = self.exitcode(home, ecodefile)
1058
1059         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1060             err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1061         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1062             # The process returned an error code or didn't exist. 
1063             # Check standard error.
1064             (err, eerr), proc = self.check_output(home, stderr)
1065
1066             # If the stderr file was not found, assume nothing bad happened,
1067             # and just ignore the error.
1068             # (cat returns 1 for error "No such file or directory")
1069             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
1070                 err = "" 
1071                 
1072         return ("", err), proc
1073     
1074     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1075         """ Waits until the pid file for the command is generated, 
1076             and returns the pid and ppid of the process """
1077         pid = ppid = None
1078         delay = 1.0
1079
1080         for i in range(2):
1081             pidtuple = self.getpid(home = home, pidfile = pidfile)
1082             
1083             if pidtuple:
1084                 pid, ppid = pidtuple
1085                 break
1086             else:
1087                 time.sleep(delay)
1088                 delay = delay * 1.5
1089         else:
1090             msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1091             self.error(msg)
1092     
1093             if raise_on_error:
1094                 raise RuntimeError(msg)
1095
1096         return pid, ppid
1097
1098     def wait_run(self, pid, ppid, trial = 0):
1099         """ wait for a remote process to finish execution """
1100         delay = 1.0
1101
1102         while True:
1103             status = self.status(pid, ppid)
1104             
1105             if status is ProcStatus.FINISHED:
1106                 break
1107             elif status is not ProcStatus.RUNNING:
1108                 delay = delay * 1.5
1109                 time.sleep(delay)
1110                 # If it takes more than 20 seconds to start, then
1111                 # asume something went wrong
1112                 if delay > 20:
1113                     break
1114             else:
1115                 # The app is running, just wait...
1116                 time.sleep(0.5)
1117
1118     def check_output(self, home, filename):
1119         """ Retrives content of file """
1120         (out, err), proc = self.execute(
1121             "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1122         return (out, err), proc
1123
1124     def is_alive(self):
1125         """ Checks if host is responsive
1126         """
1127         if self.localhost:
1128             return True
1129
1130         out = err = ""
1131         msg = "Unresponsive host. Wrong answer. "
1132
1133         # The underlying SSH layer will sometimes return an empty
1134         # output (even if the command was executed without errors).
1135         # To work arround this, repeat the operation N times or
1136         # until the result is not empty string
1137         try:
1138             (out, err), proc = self.execute("echo 'ALIVE'",
1139                                             blocking = True,
1140                                             with_lock = True)
1141             
1142             if out.find("ALIVE") > -1:
1143                 return True
1144         except:
1145             trace = traceback.format_exc()
1146             msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1147
1148         self.error(msg, out, err)
1149         return False
1150
1151     def find_home(self):
1152         """ 
1153         Retrieves host home directory
1154         """
1155         # The underlying SSH layer will sometimes return an empty
1156         # output (even if the command was executed without errors).
1157         # To work arround this, repeat the operation N times or
1158         # until the result is not empty string
1159         msg = "Impossible to retrieve HOME directory"
1160         try:
1161             (out, err), proc = self.execute("echo ${HOME}",
1162                                             blocking = True,
1163                                             with_lock = True)
1164             
1165             if out.strip() != "":
1166                 self._home_dir =  out.strip()
1167         except:
1168             trace = traceback.format_exc()
1169             msg = "Impossible to retrieve HOME directory {}".format(trace)
1170
1171         if not self._home_dir:
1172             self.error(msg)
1173             raise RuntimeError(msg)
1174
1175     def filter_existing_files(self, src, dst):
1176         """ Removes files that already exist in the Linux host from src list
1177         """
1178         # construct a dictionary with { dst: src }
1179         dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1180                 if len(src) > 1 else {dst: src[0]}
1181
1182         command = []
1183         for d in dests:
1184             command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1185
1186         command = ";".join(command)
1187
1188         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1189         
1190         for d in dests:
1191             if out.find(d) > -1:
1192                 del dests[d]
1193
1194         if not dests:
1195             return []
1196
1197         return dests.values()
1198