first set of semantic changes for python3
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy, 
21                                      ResourceState)
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33 import traceback
34
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!! 
37
38 class ExitCode:
39     """
40     Error codes that the rexitcode function can return if unable to
41     check the exit code of a spawned process
42     """
43     FILENOTFOUND = -1
44     CORRUPTFILE = -2
45     ERROR = -3
46     OK = 0
47
48 class OSType:
49     """
50     Supported flavors of Linux OS
51     """
52     DEBIAN = 1 
53     UBUNTU = 1 << 1 
54     FEDORA = 1 << 2
55     FEDORA_8 = 1 << 3 | FEDORA 
56     FEDORA_12 = 1 << 4 | FEDORA 
57     FEDORA_14 = 1 << 5 | FEDORA 
58
59 @clsinit_copy
60 class LinuxNode(ResourceManager):
61     """
62     .. class:: Class Args :
63       
64         :param ec: The Experiment controller
65         :type ec: ExperimentController
66         :param guid: guid of the RM
67         :type guid: int
68
69     .. note::
70
71         There are different ways in which commands can be executed using the
72         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
73         'run_and_wait'). 
74         
75         Brief explanation:
76
77             * 'execute' (blocking mode) :  
78
79                      HOW IT WORKS: 'execute', forks a process and run the
80                      command, synchronously, attached to the terminal, in
81                      foreground.
82                      The execute method will block until the command returns
83                      the result on 'out', 'err' (so until it finishes executing).
84   
85                      USAGE: short-lived commands that must be executed attached
86                      to a terminal and in foreground, for which it IS necessary
87                      to block until the command has finished (e.g. if you want
88                      to run 'ls' or 'cat').
89
90             * 'execute' (NON blocking mode - blocking = False) :
91
92                     HOW IT WORKS: Same as before, except that execute method
93                     will return immediately (even if command still running).
94
95                     USAGE: long-lived commands that must be executed attached
96                     to a terminal and in foreground, but for which it is not
97                     necessary to block until the command has finished. (e.g.
98                     start an application using X11 forwarding)
99
100              * 'run' :
101
102                    HOW IT WORKS: Connects to the host ( using SSH if remote)
103                    and launches the command in background, detached from any
104                    terminal (daemonized), and returns. The command continues to
105                    run remotely, but since it is detached from the terminal,
106                    its pipes (stdin, stdout, stderr) can't be redirected to the
107                    console (as normal non detached processes would), and so they
108                    are explicitly redirected to files. The pidfile is created as
109                    part of the process of launching the command. The pidfile
110                    holds the pid and ppid of the process forked in background,
111                    so later on it is possible to check whether the command is still
112                    running.
113
114                     USAGE: long-lived commands that can run detached in background,
115                     for which it is NOT necessary to block (wait) until the command
116                     has finished. (e.g. start an application that is not using X11
117                     forwarding. It can run detached and remotely in background)
118
119              * 'run_and_wait' :
120
121                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122                     the command has finished execution. It also checks whether
123                     errors occurred during runtime by reading the exitcode file,
124                     which contains the exit code of the command that was run
125                     (checking stderr only is not always reliable since many
126                     commands throw debugging info to stderr and the only way to
127                     automatically know whether an error really happened is to
128                     check the process exit code).
129
130                     Another difference with respect to 'run', is that instead
131                     of directly executing the command as a bash command line,
132                     it uploads the command to a bash script and runs the script.
133                     This allows to use the bash script to debug errors, since
134                     it remains at the remote host and can be run manually to
135                     reproduce the error.
136                   
137                     USAGE: medium-lived commands that can run detached in
138                     background, for which it IS necessary to block (wait) until
139                     the command has finished. (e.g. Package installation,
140                     source compilation, file download, etc)
141
142     """
143     _rtype = "linux::Node"
144     _help = "Controls Linux host machines ( either localhost or a host " \
145             "that can be accessed using a SSH key)"
146     _platform = "linux"
147
148     @classmethod
149     def _register_attributes(cls):
150         cls._register_attribute(Attribute(
151             "hostname", "Hostname of the machine",
152             flags = Flags.Design))
153
154         cls._register_attribute(Attribute(
155             "username", "Local account username", 
156             flags = Flags.Credential))
157
158         cls._register_attribute(Attribute(
159             "port", "SSH port",
160             flags = Flags.Design))
161         
162         cls._register_attribute(Attribute(
163             "home",
164             "Experiment home directory to store all experiment related files",
165             flags = Flags.Design))
166         
167         cls._register_attribute(Attribute(
168             "identity", "SSH identity file",
169             flags = Flags.Credential))
170         
171         cls._register_attribute(Attribute(
172             "serverKey", "Server public key", 
173             flags = Flags.Design))
174         
175         cls._register_attribute(Attribute(
176             "cleanHome",
177             "Remove all nepi files and directories "
178             " from node home folder before starting experiment", 
179             type = Types.Bool,
180             default = False,
181             flags = Flags.Design))
182
183         cls._register_attribute(Attribute(
184             "cleanExperiment", "Remove all files and directories " 
185             " from a previous same experiment, before the new experiment starts", 
186             type = Types.Bool,
187             default = False,
188             flags = Flags.Design))
189         
190         cls._register_attribute(Attribute(
191             "cleanProcesses", 
192             "Kill all running processes before starting experiment",
193             type = Types.Bool,
194             default = False,
195             flags = Flags.Design))
196         
197         cls._register_attribute(Attribute(
198             "cleanProcessesAfter", 
199             """Kill all running processes after starting experiment
200             This might be dangerous when using user root""",
201             type = Types.Bool,
202             default = True,
203             flags = Flags.Design))
204         
205         cls._register_attribute(Attribute(
206             "tearDown",
207             "Bash script to be executed before releasing the resource",
208             flags = Flags.Design))
209
210         cls._register_attribute(Attribute(
211             "gatewayUser",
212             "Gateway account username",
213             flags = Flags.Design))
214
215         cls._register_attribute(Attribute(
216             "gateway",
217             "Hostname of the gateway machine",
218             flags = Flags.Design))
219
220         cls._register_attribute(Attribute(
221             "ip",
222             "Linux host public IP address. "
223             "Must not be modified by the user unless hostname is 'localhost'",
224             flags = Flags.Design))
225
226     def __init__(self, ec, guid):
227         super(LinuxNode, self).__init__(ec, guid)
228         self._os = None
229         # home directory at Linux host
230         self._home_dir = ""
231
232         # lock to prevent concurrent applications on the same node,
233         # to execute commands at the same time. There are potential
234         # concurrency issues when using SSH to a same host from 
235         # multiple threads. There are also possible operational 
236         # issues, e.g. an application querying the existence 
237         # of a file or folder prior to its creation, and another 
238         # application creating the same file or folder in between.
239         self._node_lock = threading.Lock()
240         
241     def log_message(self, msg):
242         return " guid {} - host {} - {} "\
243             .format(self.guid, self.get("hostname"), msg)
244
245     @property
246     def home_dir(self):
247         home = self.get("home") or ""
248         if not home.startswith("/"):
249             home = os.path.join(self._home_dir, home) 
250             return home
251
252     @property
253     def nepi_home(self):
254         return os.path.join(self.home_dir, ".nepi")
255
256     @property
257     def usr_dir(self):
258         return os.path.join(self.nepi_home, "nepi-usr")
259
260     @property
261     def lib_dir(self):
262         return os.path.join(self.usr_dir, "lib")
263
264     @property
265     def bin_dir(self):
266         return os.path.join(self.usr_dir, "bin")
267
268     @property
269     def src_dir(self):
270         return os.path.join(self.usr_dir, "src")
271
272     @property
273     def share_dir(self):
274         return os.path.join(self.usr_dir, "share")
275
276     @property
277     def exp_dir(self):
278         return os.path.join(self.nepi_home, "nepi-exp")
279
280     @property
281     def exp_home(self):
282         return os.path.join(self.exp_dir, self.ec.exp_id)
283
284     @property
285     def node_home(self):
286         return os.path.join(self.exp_home, "node-{}".format(self.guid))
287
288     @property
289     def run_home(self):
290         return os.path.join(self.node_home, self.ec.run_id)
291
292     @property
293     def os(self):
294         if self._os:
295             return self._os
296
297         if not self.localhost and not self.get("username"):
298             msg = "Can't resolve OS, insufficient data "
299             self.error(msg)
300             raise RuntimeError(msg)
301
302         out = self.get_os()
303
304         if out.find("Debian") == 0: 
305             self._os = OSType.DEBIAN
306         elif out.find("Ubuntu") ==0:
307             self._os = OSType.UBUNTU
308         elif out.find("Fedora release") == 0:
309             self._os = OSType.FEDORA
310             if out.find("Fedora release 8") == 0:
311                 self._os = OSType.FEDORA_8
312             elif out.find("Fedora release 12") == 0:
313                 self._os = OSType.FEDORA_12
314             elif out.find("Fedora release 14") == 0:
315                 self._os = OSType.FEDORA_14
316         else:
317             msg = "Unsupported OS"
318             self.error(msg, out)
319             raise RuntimeError("{} - {} ".format(msg, out))
320
321         return self._os
322
323     def get_os(self):
324         # The underlying SSH layer will sometimes return an empty
325         # output (even if the command was executed without errors).
326         # To work arround this, repeat the operation N times or
327         # until the result is not empty string
328         out = ""
329         try:
330             (out, err), proc = self.execute("cat /etc/issue", 
331                                             with_lock = True,
332                                             blocking = True)
333         except:
334             trace = traceback.format_exc()
335             msg = "Error detecting OS: {} ".format(trace)
336             self.error(msg, out, err)
337     
338         return out
339
340     @property
341     def use_deb(self):
342         return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
343
344     @property
345     def use_rpm(self):
346         return (self.os & OSType.FEDORA)
347
348     @property
349     def localhost(self):
350         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
351
352     def do_provision(self):
353         # check if host is alive
354         if not self.is_alive():
355             trace = traceback.format_exc()
356             msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
357             self.error(msg)
358             raise RuntimeError(msg)
359
360         self.find_home()
361
362         if self.get("cleanProcesses"):
363             self.clean_processes()
364
365         if self.get("cleanHome"):
366             self.clean_home()
367             
368         if self.get("cleanExperiment"):
369             self.clean_experiment()
370             
371         # Create shared directory structure and node home directory
372         paths = [self.lib_dir, 
373                  self.bin_dir, 
374                  self.src_dir, 
375                  self.share_dir, 
376                  self.node_home]
377
378         self.mkdir(paths)
379
380         # Get Public IP address if possible
381         if not self.get("ip"):
382             try:
383                 ip = sshfuncs.gethostbyname(self.get("hostname"))
384                 self.set("ip", ip)
385             except:
386                 if self.get("gateway") is None:
387                     msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
388                     self.error(msg)
389
390         super(LinuxNode, self).do_provision()
391
392     def do_deploy(self):
393         if self.state == ResourceState.NEW:
394             self.info("Deploying node")
395             self.do_discover()
396             self.do_provision()
397
398         # Node needs to wait until all associated interfaces are 
399         # ready before it can finalize deployment
400         from nepi.resources.linux.interface import LinuxInterface
401         ifaces = self.get_connected(LinuxInterface.get_rtype())
402         for iface in ifaces:
403             if iface.state < ResourceState.READY:
404                 self.ec.schedule(self.reschedule_delay, self.deploy)
405                 return 
406
407         super(LinuxNode, self).do_deploy()
408
409     def do_release(self):
410         rms = self.get_connected()
411         for rm in rms:
412             # Node needs to wait until all associated RMs are released
413             # before it can be released
414             if rm.state != ResourceState.RELEASED:
415                 self.ec.schedule(self.reschedule_delay, self.release)
416                 return 
417
418         tear_down = self.get("tearDown")
419         if tear_down:
420             self.execute(tear_down)
421
422         if self.get("cleanProcessesAfter"):
423             self.clean_processes()
424
425         super(LinuxNode, self).do_release()
426
427     def valid_connection(self, guid):
428         # TODO: Validate!
429         return True
430
431     def clean_processes(self):
432         self.info("Cleaning up processes")
433
434         if self.localhost:
435             return 
436         
437         if self.get("username") != 'root':
438             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
439                    "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
440                    "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
441         else:
442             if self.state >= ResourceState.READY:
443                 ########################
444                 #Collect all process (must change for a more intelligent way)
445                 ppid = []
446                 pids = []
447                 avoid_pids = "ps axjf | awk '{print $1,$2}'"
448                 (out, err), proc = self.execute(avoid_pids)
449                 if len(out) != 0:
450                     for line in out.strip().split("\n"):
451                         parts = line.strip().split(" ")
452                         ppid.append(parts[0])
453                         pids.append(parts[1])
454
455                 #Collect all process below ssh -D
456                 tree_owner = 0
457                 ssh_pids = []
458                 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
459                 (out, err), proc = self.execute(sshs)
460                 if len(out) != 0:
461                     for line in out.strip().split("\n"):
462                         parts = line.strip().split(" ")
463                         if parts[1].startswith('root@pts'):
464                             ssh_pids.append(parts[0])
465                         elif parts[1] == "-D":
466                             tree_owner = parts[0]
467
468                 avoid_kill = []
469                 temp = []
470                 #Search for the child process of the pid's collected at the first block.
471                 for process in ssh_pids:
472                     temp = self.search_for_child(process, pids, ppid)
473                     avoid_kill = list(set(temp))
474                 
475                 if len(avoid_kill) > 0:
476                     avoid_kill.append(tree_owner) 
477                 ########################
478
479                 import pickle
480                 pids = pickle.load(open("/tmp/save.proc", "rb"))
481                 pids_temp = dict()
482                 ps_aux = "ps aux | awk '{print $2,$11}'"
483                 (out, err), proc = self.execute(ps_aux)
484                 if len(out) != 0:
485                     for line in out.strip().split("\n"):
486                         parts = line.strip().split(" ")
487                         pids_temp[parts[0]] = parts[1]
488                     # creates the difference between the machine pids freezed (pickle) and the actual
489                     # adding the avoided pids filtered above (avoid_kill) to allow users keep process
490                     # alive when using besides ssh connections  
491                     kill_pids = set(pids_temp.items()) - set(pids.items())
492                     kill_pids = ' '.join(list(dict(kill_pids).keys()))
493
494                     # removing pids from beside connections and its process
495                     kill_pids = kill_pids.split(' ')
496                     kill_pids = list(set(kill_pids) - set(avoid_kill))
497                     kill_pids = ' '.join(kill_pids)
498
499                     cmd = ("killall tcpdump || /bin/true ; " +
500                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
501                            "kill {} || /bin/true ; ".format(kill_pids))
502                 else:
503                     cmd = ("killall tcpdump || /bin/true ; " +
504                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
505             else:
506                 cmd = ("killall tcpdump || /bin/true ; " +
507                        "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
508
509         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
510
511     def search_for_child(self, pid, pids, ppid, family=[]):
512         """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
513         """
514         family.append(pid)
515         for key, value in enumerate(ppid):
516             if value == pid:
517                 child = pids[key]
518                 self.search_for_child(child, pids, ppid)
519         return family
520         
521     def clean_home(self):
522         """ Cleans all NEPI related folders in the Linux host
523         """
524         self.info("Cleaning up home")
525         
526         cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
527               .format(self.home_dir)
528
529         return self.execute(cmd, with_lock = True)
530
531     def clean_experiment(self):
532         """ Cleans all experiment related files in the Linux host.
533         It preserves NEPI files and folders that have a multi experiment
534         scope.
535         """
536         self.info("Cleaning up experiment files")
537         
538         cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
539               .format(self.exp_dir, self.ec.exp_id)
540         
541         return self.execute(cmd, with_lock = True)
542
543     def execute(self, command,
544                 sudo = False,
545                 env = None,
546                 tty = False,
547                 forward_x11 = False,
548                 retry = 3,
549                 connect_timeout = 30,
550                 strict_host_checking = False,
551                 persistent = True,
552                 blocking = True,
553                 with_lock = False
554     ):
555         """ Notice that this invocation will block until the
556         execution finishes. If this is not the desired behavior,
557         use 'run' instead."""
558
559         if self.localhost:
560             (out, err), proc = execfuncs.lexec(
561                 command, 
562                 user = self.get("username"), # still problem with localhost
563                 sudo = sudo,
564                 env = env)
565         else:
566             if with_lock:
567                 # If the execute command is blocking, we don't want to keep
568                 # the node lock. This lock is used to avoid race conditions
569                 # when creating the ControlMaster sockets. A more elegant
570                 # solution is needed.
571                 with self._node_lock:
572                     (out, err), proc = sshfuncs.rexec(
573                         command, 
574                         host = self.get("hostname"),
575                         user = self.get("username"),
576                         port = self.get("port"),
577                         gwuser = self.get("gatewayUser"),
578                         gw = self.get("gateway"),
579                         agent = True,
580                         sudo = sudo,
581                         identity = self.get("identity"),
582                         server_key = self.get("serverKey"),
583                         env = env,
584                         tty = tty,
585                         forward_x11 = forward_x11,
586                         retry = retry,
587                         connect_timeout = connect_timeout,
588                         persistent = persistent,
589                         blocking = blocking, 
590                         strict_host_checking = strict_host_checking
591                     )
592             else:
593                 (out, err), proc = sshfuncs.rexec(
594                     command, 
595                     host = self.get("hostname"),
596                     user = self.get("username"),
597                     port = self.get("port"),
598                     gwuser = self.get("gatewayUser"),
599                     gw = self.get("gateway"),
600                     agent = True,
601                     sudo = sudo,
602                     identity = self.get("identity"),
603                     server_key = self.get("serverKey"),
604                     env = env,
605                     tty = tty,
606                     forward_x11 = forward_x11,
607                     retry = retry,
608                     connect_timeout = connect_timeout,
609                     persistent = persistent,
610                     blocking = blocking, 
611                     strict_host_checking = strict_host_checking
612                 )
613
614         return (out, err), proc
615
616     def run(self, command, home,
617             create_home = False,
618             pidfile = 'pidfile',
619             stdin = None, 
620             stdout = 'stdout', 
621             stderr = 'stderr', 
622             sudo = False,
623             tty = False,
624             strict_host_checking = False):
625         
626         self.debug("Running command '{}'".format(command))
627         
628         if self.localhost:
629             (out, err), proc = execfuncs.lspawn(
630                 command, pidfile,
631                 home = home, 
632                 create_home = create_home, 
633                 stdin = stdin or '/dev/null',
634                 stdout = stdout or '/dev/null',
635                 stderr = stderr or '/dev/null',
636                 sudo = sudo) 
637         else:
638             with self._node_lock:
639                 (out, err), proc = sshfuncs.rspawn(
640                     command,
641                     pidfile = pidfile,
642                     home = home,
643                     create_home = create_home,
644                     stdin = stdin or '/dev/null',
645                     stdout = stdout or '/dev/null',
646                     stderr = stderr or '/dev/null',
647                     sudo = sudo,
648                     host = self.get("hostname"),
649                     user = self.get("username"),
650                     port = self.get("port"),
651                     gwuser = self.get("gatewayUser"),
652                     gw = self.get("gateway"),
653                     agent = True,
654                     identity = self.get("identity"),
655                     server_key = self.get("serverKey"),
656                     tty = tty,
657                     strict_host_checking = strict_host_checking
658                 )
659
660         return (out, err), proc
661
662     def getpid(self, home, pidfile = "pidfile"):
663         if self.localhost:
664             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
665         else:
666             with self._node_lock:
667                 pidtuple = sshfuncs.rgetpid(
668                     os.path.join(home, pidfile),
669                     host = self.get("hostname"),
670                     user = self.get("username"),
671                     port = self.get("port"),
672                     gwuser = self.get("gatewayUser"),
673                     gw = self.get("gateway"),
674                     agent = True,
675                     identity = self.get("identity"),
676                     server_key = self.get("serverKey"),
677                     strict_host_checking = False
678                 )
679         
680         return pidtuple
681
682     def status(self, pid, ppid):
683         if self.localhost:
684             status = execfuncs.lstatus(pid, ppid)
685         else:
686             with self._node_lock:
687                 status = sshfuncs.rstatus(
688                     pid, ppid,
689                     host = self.get("hostname"),
690                     user = self.get("username"),
691                     port = self.get("port"),
692                     gwuser = self.get("gatewayUser"),
693                     gw = self.get("gateway"),
694                     agent = True,
695                     identity = self.get("identity"),
696                     server_key = self.get("serverKey"),
697                     strict_host_checking = False
698                 )
699            
700         return status
701     
702     def kill(self, pid, ppid, sudo = False):
703         out = err = ""
704         proc = None
705         status = self.status(pid, ppid)
706
707         if status == sshfuncs.ProcStatus.RUNNING:
708             if self.localhost:
709                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
710             else:
711                 with self._node_lock:
712                     (out, err), proc = sshfuncs.rkill(
713                         pid, ppid,
714                         host = self.get("hostname"),
715                         user = self.get("username"),
716                         port = self.get("port"),
717                         gwuser = self.get("gatewayUser"),
718                         gw = self.get("gateway"),
719                         agent = True,
720                         sudo = sudo,
721                         identity = self.get("identity"),
722                         server_key = self.get("serverKey"),
723                         strict_host_checking = False
724                     )
725
726         return (out, err), proc
727
728     def copy(self, src, dst):
729         if self.localhost:
730             (out, err), proc = execfuncs.lcopy(
731                 src, dst, 
732                 recursive = True)
733         else:
734             with self._node_lock:
735                 (out, err), proc = sshfuncs.rcopy(
736                     src, dst, 
737                     port = self.get("port"),
738                     gwuser = self.get("gatewayUser"),
739                     gw = self.get("gateway"),
740                     identity = self.get("identity"),
741                     server_key = self.get("serverKey"),
742                     recursive = True,
743                     strict_host_checking = False)
744
745         return (out, err), proc
746
747     def upload(self, src, dst, text = False, overwrite = True,
748                raise_on_error = True):
749         """ Copy content to destination
750
751         src  string with the content to copy. Can be:
752             - plain text
753             - a string with the path to a local file
754             - a string with a semi-colon separeted list of local files
755             - a string with a local directory
756
757         dst  string with destination path on the remote host (remote is 
758             always self.host)
759
760         text src is text input, it must be stored into a temp file before 
761         uploading
762         """
763         # If source is a string input 
764         f = None
765         if text and not os.path.isfile(src):
766             # src is text input that should be uploaded as file
767             # create a temporal file with the content to upload
768             # in python3 we need to open in binary mode if str is bytes
769             mode = 'w' if isinstance(src, str) else 'wb'
770             f = tempfile.NamedTemporaryFile(mode=mode, delete=False)
771             f.write(src)
772             f.close()
773             src = f.name
774
775         # If dst files should not be overwritten, check that the files do not
776         # exits already
777         if isinstance(src, str):
778             src = list(map(str.strip, src.split(";")))
779     
780         if overwrite == False:
781             src = self.filter_existing_files(src, dst)
782             if not src:
783                 return ("", ""), None
784
785         if not self.localhost:
786             # Build destination as <user>@<server>:<path>
787             dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
788
789         ((out, err), proc) = self.copy(src, dst)
790
791         # clean up temp file
792         if f:
793             os.remove(f.name)
794
795         if err:
796             msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
797             self.error(msg, out, err)
798             
799             msg = "{} out: {} err: {}".format(msg, out, err)
800             if raise_on_error:
801                 raise RuntimeError(msg)
802
803         return ((out, err), proc)
804
805     def download(self, src, dst, raise_on_error = True):
806         if not self.localhost:
807             # Build destination as <user>@<server>:<path>
808             src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
809
810         ((out, err), proc) = self.copy(src, dst)
811
812         if err:
813             msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst) 
814             self.error(msg, out, err)
815
816             if raise_on_error:
817                 raise RuntimeError(msg)
818
819         return ((out, err), proc)
820
821     def install_packages_command(self, packages):
822         command = ""
823         if self.use_rpm:
824             command = rpmfuncs.install_packages_command(self.os, packages)
825         elif self.use_deb:
826             command = debfuncs.install_packages_command(self.os, packages)
827         else:
828             msg = "Error installing packages ( OS not known ) "
829             self.error(msg, self.os)
830             raise RuntimeError(msg)
831
832         return command
833
834     def install_packages(self, packages, home,
835                          run_home = None,
836                          raise_on_error = True):
837         """ Install packages in the Linux host.
838
839         'home' is the directory to upload the package installation script.
840         'run_home' is the directory from where to execute the script.
841         """
842         command = self.install_packages_command(packages)
843
844         run_home = run_home or home
845
846         (out, err), proc = self.run_and_wait(command, run_home, 
847                                              shfile = os.path.join(home, "instpkg.sh"),
848                                              pidfile = "instpkg_pidfile",
849                                              ecodefile = "instpkg_exitcode",
850                                              stdout = "instpkg_stdout", 
851                                              stderr = "instpkg_stderr",
852                                              overwrite = False,
853                                              raise_on_error = raise_on_error)
854
855         return (out, err), proc 
856
857     def remove_packages(self, packages, home, run_home = None,
858                         raise_on_error = True):
859         """ Uninstall packages from the Linux host.
860
861         'home' is the directory to upload the package un-installation script.
862         'run_home' is the directory from where to execute the script.
863         """
864         if self.use_rpm:
865             command = rpmfuncs.remove_packages_command(self.os, packages)
866         elif self.use_deb:
867             command = debfuncs.remove_packages_command(self.os, packages)
868         else:
869             msg = "Error removing packages ( OS not known ) "
870             self.error(msg)
871             raise RuntimeError(msg)
872
873         run_home = run_home or home
874
875         (out, err), proc = self.run_and_wait(command, run_home, 
876                                              shfile = os.path.join(home, "rmpkg.sh"),
877                                              pidfile = "rmpkg_pidfile",
878                                              ecodefile = "rmpkg_exitcode",
879                                              stdout = "rmpkg_stdout", 
880                                              stderr = "rmpkg_stderr",
881                                              overwrite = False,
882                                              raise_on_error = raise_on_error)
883         
884         return (out, err), proc 
885
886     def mkdir(self, paths, clean = False):
887         """ Paths is either a single remote directory path to create,
888         or a list of directories to create.
889         """
890         if clean:
891             self.rmdir(paths)
892
893         if isinstance(paths, str):
894             paths = [paths]
895
896         cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
897
898         return self.execute(cmd, with_lock = True)
899
900     def rmdir(self, paths):
901         """ Paths is either a single remote directory path to delete,
902         or a list of directories to delete.
903         """
904
905         if isinstance(paths, str):
906             paths = [paths]
907
908         cmd = " ; ".join(["rm -rf {}".format(path) for path in paths])
909
910         return self.execute(cmd, with_lock = True)
911     
912     def run_and_wait(self, command, home, 
913                      shfile="cmd.sh",
914                      env=None,
915                      overwrite=True,
916                      wait_run=True,
917                      pidfile="pidfile", 
918                      ecodefile="exitcode", 
919                      stdin=None, 
920                      stdout="stdout", 
921                      stderr="stderr", 
922                      sudo=False,
923                      tty=False,
924                      raise_on_error=True):
925         """
926         Uploads the 'command' to a bash script in the host.
927         Then runs the script detached in background in the host, and
928         busy-waites until the script finishes executing.
929         """
930
931         if not shfile.startswith("/"):
932             shfile = os.path.join(home, shfile)
933
934         self.upload_command(command, 
935                             shfile = shfile, 
936                             ecodefile = ecodefile, 
937                             env = env,
938                             overwrite = overwrite)
939
940         command = "bash {}".format(shfile)
941         # run command in background in remote host
942         (out, err), proc = self.run(command, home, 
943                                     pidfile = pidfile,
944                                     stdin = stdin, 
945                                     stdout = stdout, 
946                                     stderr = stderr, 
947                                     sudo = sudo,
948                                     tty = tty)
949
950         # check no errors occurred
951         if proc.poll():
952             msg = " Failed to run command '{}' ".format(command)
953             self.error(msg, out, err)
954             if raise_on_error:
955                 raise RuntimeError(msg)
956
957         # Wait for pid file to be generated
958         pid, ppid = self.wait_pid(
959             home = home, 
960             pidfile = pidfile, 
961             raise_on_error = raise_on_error)
962
963         if wait_run:
964             # wait until command finishes to execute
965             self.wait_run(pid, ppid)
966             
967             (eout, err), proc = self.check_errors(home,
968                                                   ecodefile = ecodefile,
969                                                   stderr = stderr)
970
971             # Out is what was written in the stderr file
972             if err:
973                 msg = " Failed to run command '{}' ".format(command)
974                 self.error(msg, eout, err)
975
976                 if raise_on_error:
977                     raise RuntimeError(msg)
978
979         (out, oerr), proc = self.check_output(home, stdout)
980         
981         return (out, err), proc
982         
983     def exitcode(self, home, ecodefile = "exitcode"):
984         """
985         Get the exit code of an application.
986         Returns an integer value with the exit code 
987         """
988         (out, err), proc = self.check_output(home, ecodefile)
989
990         # Succeeded to open file, return exit code in the file
991         if proc.wait() == 0:
992             try:
993                 return int(out.strip())
994             except:
995                 # Error in the content of the file!
996                 return ExitCode.CORRUPTFILE
997
998         # No such file or directory
999         if proc.returncode == 1:
1000             return ExitCode.FILENOTFOUND
1001         
1002         # Other error from 'cat'
1003         return ExitCode.ERROR
1004
1005     def upload_command(self, command, 
1006                        shfile="cmd.sh",
1007                        ecodefile="exitcode",
1008                        overwrite=True,
1009                        env=None):
1010         """ Saves the command as a bash script file in the remote host, and
1011         forces to save the exit code of the command execution to the ecodefile
1012         """
1013
1014         if not (command.strip().endswith(";") or command.strip().endswith("&")):
1015             command += ";"
1016             
1017         # The exit code of the command will be stored in ecodefile
1018         command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1019                   .format(command=command, ecodefile=ecodefile)
1020
1021         # Export environment
1022         environ = self.format_environment(env)
1023
1024         # Add environ to command
1025         command = environ + command
1026
1027         return self.upload(command, shfile, text=True, overwrite=overwrite)
1028
1029     def format_environment(self, env, inline=False):
1030         """ Formats the environment variables for a command to be executed
1031         either as an inline command
1032         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
1033         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1034         """
1035         if not env: return ""
1036
1037         # Remove extra white spaces
1038         env = re.sub(r'\s+', ' ', env.strip())
1039
1040         sep = ";" if inline else "\n"
1041         return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep 
1042
1043     def check_errors(self, home, 
1044                      ecodefile = "exitcode", 
1045                      stderr = "stderr"):
1046         """ Checks whether errors occurred while running a command.
1047         It first checks the exit code for the command, and only if the
1048         exit code is an error one it returns the error output.
1049
1050         """
1051         proc = None
1052         err = ""
1053
1054         # get exit code saved in the 'exitcode' file
1055         ecode = self.exitcode(home, ecodefile)
1056
1057         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1058             err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1059         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1060             # The process returned an error code or didn't exist. 
1061             # Check standard error.
1062             (err, eerr), proc = self.check_output(home, stderr)
1063
1064             # If the stderr file was not found, assume nothing bad happened,
1065             # and just ignore the error.
1066             # (cat returns 1 for error "No such file or directory")
1067             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
1068                 err = "" 
1069                 
1070         return ("", err), proc
1071     
1072     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1073         """ Waits until the pid file for the command is generated, 
1074             and returns the pid and ppid of the process """
1075         pid = ppid = None
1076         delay = 1.0
1077
1078         for i in range(2):
1079             pidtuple = self.getpid(home = home, pidfile = pidfile)
1080             
1081             if pidtuple:
1082                 pid, ppid = pidtuple
1083                 break
1084             else:
1085                 time.sleep(delay)
1086                 delay = delay * 1.5
1087         else:
1088             msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1089             self.error(msg)
1090     
1091             if raise_on_error:
1092                 raise RuntimeError(msg)
1093
1094         return pid, ppid
1095
1096     def wait_run(self, pid, ppid, trial = 0):
1097         """ wait for a remote process to finish execution """
1098         delay = 1.0
1099
1100         while True:
1101             status = self.status(pid, ppid)
1102             
1103             if status is ProcStatus.FINISHED:
1104                 break
1105             elif status is not ProcStatus.RUNNING:
1106                 delay = delay * 1.5
1107                 time.sleep(delay)
1108                 # If it takes more than 20 seconds to start, then
1109                 # asume something went wrong
1110                 if delay > 20:
1111                     break
1112             else:
1113                 # The app is running, just wait...
1114                 time.sleep(0.5)
1115
1116     def check_output(self, home, filename):
1117         """ Retrives content of file """
1118         (out, err), proc = self.execute(
1119             "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1120         return (out, err), proc
1121
1122     def is_alive(self):
1123         """ Checks if host is responsive
1124         """
1125         if self.localhost:
1126             return True
1127
1128         out = err = ""
1129         msg = "Unresponsive host. Wrong answer. "
1130
1131         # The underlying SSH layer will sometimes return an empty
1132         # output (even if the command was executed without errors).
1133         # To work arround this, repeat the operation N times or
1134         # until the result is not empty string
1135         try:
1136             (out, err), proc = self.execute("echo 'ALIVE'",
1137                                             blocking = True,
1138                                             with_lock = True)
1139             
1140             if out.find("ALIVE") > -1:
1141                 return True
1142         except:
1143             trace = traceback.format_exc()
1144             msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1145
1146         self.error(msg, out, err)
1147         return False
1148
1149     def find_home(self):
1150         """ 
1151         Retrieves host home directory
1152         """
1153         # The underlying SSH layer will sometimes return an empty
1154         # output (even if the command was executed without errors).
1155         # To work arround this, repeat the operation N times or
1156         # until the result is not empty string
1157         msg = "Impossible to retrieve HOME directory"
1158         try:
1159             (out, err), proc = self.execute("echo ${HOME}",
1160                                             blocking = True,
1161                                             with_lock = True)
1162             
1163             if out.strip() != "":
1164                 self._home_dir =  out.strip()
1165         except:
1166             trace = traceback.format_exc()
1167             msg = "Impossible to retrieve HOME directory {}".format(trace)
1168
1169         if not self._home_dir:
1170             self.error(msg)
1171             raise RuntimeError(msg)
1172
1173     def filter_existing_files(self, src, dst):
1174         """ Removes files that already exist in the Linux host from src list
1175         """
1176         # construct a dictionary with { dst: src }
1177         dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1178                 if len(src) > 1 else {dst: src[0]}
1179
1180         command = []
1181         for d in list(dests.keys()):
1182             command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1183
1184         command = ";".join(command)
1185
1186         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1187         
1188         for d in list(dests.keys()):
1189             if out.find(d) > -1:
1190                 del dests[d]
1191
1192         if not dests:
1193             return []
1194
1195         return list(dests.values())
1196