revisited declaration of attributes for node and application
[nepi.git] / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy, 
21                                      ResourceState)
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33 import traceback
34
35 from six import PY3
36
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!! 
39
40 class ExitCode:
41     """
42     Error codes that the rexitcode function can return if unable to
43     check the exit code of a spawned process
44     """
45     FILENOTFOUND = -1
46     CORRUPTFILE = -2
47     ERROR = -3
48     OK = 0
49
50 class OSType:
51     """
52     Supported flavors of Linux OS
53     """
54     DEBIAN = 1 
55     UBUNTU = 1 << 1 
56     FEDORA = 1 << 2
57     FEDORA_8 = 1 << 3 | FEDORA 
58     FEDORA_12 = 1 << 4 | FEDORA 
59     FEDORA_14 = 1 << 5 | FEDORA 
60
61 @clsinit_copy
62 class LinuxNode(ResourceManager):
63     """
64     .. class:: Class Args :
65       
66         :param ec: The Experiment controller
67         :type ec: ExperimentController
68         :param guid: guid of the RM
69         :type guid: int
70
71     .. note::
72
73         There are different ways in which commands can be executed using the
74         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
75         'run_and_wait'). 
76         
77         Brief explanation:
78
79             * 'execute' (blocking mode) :  
80
81                      HOW IT WORKS: 'execute', forks a process and run the
82                      command, synchronously, attached to the terminal, in
83                      foreground.
84                      The execute method will block until the command returns
85                      the result on 'out', 'err' (so until it finishes executing).
86   
87                      USAGE: short-lived commands that must be executed attached
88                      to a terminal and in foreground, for which it IS necessary
89                      to block until the command has finished (e.g. if you want
90                      to run 'ls' or 'cat').
91
92             * 'execute' (NON blocking mode - blocking = False) :
93
94                     HOW IT WORKS: Same as before, except that execute method
95                     will return immediately (even if command still running).
96
97                     USAGE: long-lived commands that must be executed attached
98                     to a terminal and in foreground, but for which it is not
99                     necessary to block until the command has finished. (e.g.
100                     start an application using X11 forwarding)
101
102              * 'run' :
103
104                    HOW IT WORKS: Connects to the host ( using SSH if remote)
105                    and launches the command in background, detached from any
106                    terminal (daemonized), and returns. The command continues to
107                    run remotely, but since it is detached from the terminal,
108                    its pipes (stdin, stdout, stderr) can't be redirected to the
109                    console (as normal non detached processes would), and so they
110                    are explicitly redirected to files. The pidfile is created as
111                    part of the process of launching the command. The pidfile
112                    holds the pid and ppid of the process forked in background,
113                    so later on it is possible to check whether the command is still
114                    running.
115
116                     USAGE: long-lived commands that can run detached in background,
117                     for which it is NOT necessary to block (wait) until the command
118                     has finished. (e.g. start an application that is not using X11
119                     forwarding. It can run detached and remotely in background)
120
121              * 'run_and_wait' :
122
123                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
124                     the command has finished execution. It also checks whether
125                     errors occurred during runtime by reading the exitcode file,
126                     which contains the exit code of the command that was run
127                     (checking stderr only is not always reliable since many
128                     commands throw debugging info to stderr and the only way to
129                     automatically know whether an error really happened is to
130                     check the process exit code).
131
132                     Another difference with respect to 'run', is that instead
133                     of directly executing the command as a bash command line,
134                     it uploads the command to a bash script and runs the script.
135                     This allows to use the bash script to debug errors, since
136                     it remains at the remote host and can be run manually to
137                     reproduce the error.
138                   
139                     USAGE: medium-lived commands that can run detached in
140                     background, for which it IS necessary to block (wait) until
141                     the command has finished. (e.g. Package installation,
142                     source compilation, file download, etc)
143
144     """
145     _rtype = "linux::Node"
146     _help = "Controls Linux host machines ( either localhost or a host " \
147             "that can be accessed using a SSH key)"
148     _platform = "linux"
149
150     @classmethod
151     def _register_attributes(cls):
152         cls._register_attribute(
153             Attribute("hostname",
154                       "Hostname of the machine",
155                       flags = Flags.Design))
156         cls._register_attribute(
157             Attribute("username",
158                       "Local account username", 
159                       flags = Flags.Credential))
160         cls._register_attribute(
161             Attribute("port",
162                       "SSH port",
163                       flags = Flags.Design))
164         cls._register_attribute(
165             Attribute("home",
166                       "Experiment home directory to store all experiment related files",
167                       flags = Flags.Design))
168         cls._register_attribute(
169             Attribute("identity",
170                       "SSH identity file",
171                       flags = Flags.Credential))
172         cls._register_attribute(
173             Attribute("serverKey",
174                       "Server public key", 
175                       flags = Flags.Design))
176         cls._register_attribute(
177             Attribute("cleanHome",
178                       "Remove all nepi files and directories "
179                       " from node home folder before starting experiment", 
180                       type = Types.Bool,
181                       default = False,
182             flags = Flags.Design))
183         cls._register_attribute(
184             Attribute("cleanExperiment",
185                       "Remove all files and directories " 
186                       " from a previous same experiment, before the new experiment starts", 
187                       type = Types.Bool,
188                       default = False,
189                       flags = Flags.Design))
190         cls._register_attribute(
191             Attribute("cleanProcesses",
192                       "Kill all running processes before starting experiment",
193                       type = Types.Bool,
194                       default = False,
195                       flags = Flags.Design))
196         cls._register_attribute(
197             Attribute("cleanProcessesAfter",
198                       "Kill all running processes after starting experiment"
199                       "NOTE: This might be dangerous when using user root",
200                       type = Types.Bool,
201                       default = True,
202                       flags = Flags.Design))
203         cls._register_attribute(
204             Attribute("tearDown",
205                       "Bash script to be executed before releasing the resource",
206                       flags = Flags.Design))
207         cls._register_attribute(
208             Attribute("gatewayUser",
209                       "Gateway account username",
210                       flags = Flags.Design))
211         cls._register_attribute(
212             Attribute("gateway",
213                       "Hostname of the gateway machine",
214                       flags = Flags.Design))
215         cls._register_attribute(
216             Attribute("ip",
217                       "Linux host public IP address. "
218                       "Must not be modified by the user unless hostname is 'localhost'",
219                       flags = Flags.Design))
220
221     def __init__(self, ec, guid):
222         super(LinuxNode, self).__init__(ec, guid)
223         self._os = None
224         # home directory at Linux host
225         self._home_dir = ""
226
227         # lock to prevent concurrent applications on the same node,
228         # to execute commands at the same time. There are potential
229         # concurrency issues when using SSH to a same host from 
230         # multiple threads. There are also possible operational 
231         # issues, e.g. an application querying the existence 
232         # of a file or folder prior to its creation, and another 
233         # application creating the same file or folder in between.
234         self._node_lock = threading.Lock()
235         
236     def log_message(self, msg):
237         return " guid {} - host {} - {} "\
238             .format(self.guid, self.get("hostname"), msg)
239
240     @property
241     def home_dir(self):
242         home = self.get("home") or ""
243         if not home.startswith("/"):
244             home = os.path.join(self._home_dir, home) 
245             return home
246
247     @property
248     def nepi_home(self):
249         return os.path.join(self.home_dir, ".nepi")
250
251     @property
252     def usr_dir(self):
253         return os.path.join(self.nepi_home, "nepi-usr")
254
255     @property
256     def lib_dir(self):
257         return os.path.join(self.usr_dir, "lib")
258
259     @property
260     def bin_dir(self):
261         return os.path.join(self.usr_dir, "bin")
262
263     @property
264     def src_dir(self):
265         return os.path.join(self.usr_dir, "src")
266
267     @property
268     def share_dir(self):
269         return os.path.join(self.usr_dir, "share")
270
271     @property
272     def exp_dir(self):
273         return os.path.join(self.nepi_home, "nepi-exp")
274
275     @property
276     def exp_home(self):
277         return os.path.join(self.exp_dir, self.ec.exp_id)
278
279     @property
280     def node_home(self):
281         return os.path.join(self.exp_home, "node-{}".format(self.guid))
282
283     @property
284     def run_home(self):
285         return os.path.join(self.node_home, self.ec.run_id)
286
287     @property
288     def os(self):
289         if self._os:
290             return self._os
291
292         if not self.localhost and not self.get("username"):
293             msg = "Can't resolve OS, insufficient data "
294             self.error(msg)
295             raise RuntimeError(msg)
296
297         out = self.get_os()
298
299         if out.find("Debian") == 0: 
300             self._os = OSType.DEBIAN
301         elif out.find("Ubuntu") ==0:
302             self._os = OSType.UBUNTU
303         elif out.find("Fedora release") == 0:
304             self._os = OSType.FEDORA
305             if out.find("Fedora release 8") == 0:
306                 self._os = OSType.FEDORA_8
307             elif out.find("Fedora release 12") == 0:
308                 self._os = OSType.FEDORA_12
309             elif out.find("Fedora release 14") == 0:
310                 self._os = OSType.FEDORA_14
311         else:
312             msg = "Unsupported OS"
313             self.error(msg, out)
314             raise RuntimeError("{} - {} ".format(msg, out))
315
316         return self._os
317
318     def get_os(self):
319         # The underlying SSH layer will sometimes return an empty
320         # output (even if the command was executed without errors).
321         # To work arround this, repeat the operation N times or
322         # until the result is not empty string
323         out = ""
324         try:
325             (out, err), proc = self.execute("cat /etc/issue", 
326                                             with_lock = True,
327                                             blocking = True)
328         except:
329             trace = traceback.format_exc()
330             msg = "Error detecting OS: {} ".format(trace)
331             self.error(msg, out, err)
332     
333         return out
334
335     @property
336     def use_deb(self):
337         return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
338
339     @property
340     def use_rpm(self):
341         return (self.os & OSType.FEDORA)
342
343     @property
344     def localhost(self):
345         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
346
347     def do_provision(self):
348         # check if host is alive
349         if not self.is_alive():
350             trace = traceback.format_exc()
351             msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
352             self.error(msg)
353             raise RuntimeError(msg)
354
355         self.find_home()
356
357         if self.get("cleanProcesses"):
358             self.clean_processes()
359
360         if self.get("cleanHome"):
361             self.clean_home()
362             
363         if self.get("cleanExperiment"):
364             self.clean_experiment()
365             
366         # Create shared directory structure and node home directory
367         paths = [self.lib_dir, 
368                  self.bin_dir, 
369                  self.src_dir, 
370                  self.share_dir, 
371                  self.node_home]
372
373         self.mkdir(paths)
374
375         # Get Public IP address if possible
376         if not self.get("ip"):
377             try:
378                 ip = sshfuncs.gethostbyname(self.get("hostname"))
379                 self.set("ip", ip)
380             except:
381                 if self.get("gateway") is None:
382                     msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
383                     self.error(msg)
384
385         super(LinuxNode, self).do_provision()
386
387     def do_deploy(self):
388         if self.state == ResourceState.NEW:
389             self.info("Deploying node")
390             self.do_discover()
391             self.do_provision()
392
393         # Node needs to wait until all associated interfaces are 
394         # ready before it can finalize deployment
395         from nepi.resources.linux.interface import LinuxInterface
396         ifaces = self.get_connected(LinuxInterface.get_rtype())
397         for iface in ifaces:
398             if iface.state < ResourceState.READY:
399                 self.ec.schedule(self.reschedule_delay, self.deploy)
400                 return 
401
402         super(LinuxNode, self).do_deploy()
403
404     def do_release(self):
405         rms = self.get_connected()
406         for rm in rms:
407             # Node needs to wait until all associated RMs are released
408             # before it can be released
409             if rm.state != ResourceState.RELEASED:
410                 self.ec.schedule(self.reschedule_delay, self.release)
411                 return 
412
413         tear_down = self.get("tearDown")
414         if tear_down:
415             self.execute(tear_down)
416
417         if self.get("cleanProcessesAfter"):
418             self.clean_processes()
419
420         super(LinuxNode, self).do_release()
421
422     def valid_connection(self, guid):
423         # TODO: Validate!
424         return True
425
426     def clean_processes(self):
427         self.info("Cleaning up processes")
428
429         if self.localhost:
430             return 
431         
432         if self.get("username") != 'root':
433             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
434                    "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
435                    "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
436         else:
437             if self.state >= ResourceState.READY:
438                 ########################
439                 #Collect all process (must change for a more intelligent way)
440                 ppid = []
441                 pids = []
442                 avoid_pids = "ps axjf | awk '{print $1,$2}'"
443                 (out, err), proc = self.execute(avoid_pids)
444                 if len(out) != 0:
445                     for line in out.strip().split("\n"):
446                         parts = line.strip().split(" ")
447                         ppid.append(parts[0])
448                         pids.append(parts[1])
449
450                 #Collect all process below ssh -D
451                 tree_owner = 0
452                 ssh_pids = []
453                 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
454                 (out, err), proc = self.execute(sshs)
455                 if len(out) != 0:
456                     for line in out.strip().split("\n"):
457                         parts = line.strip().split(" ")
458                         if parts[1].startswith('root@pts'):
459                             ssh_pids.append(parts[0])
460                         elif parts[1] == "-D":
461                             tree_owner = parts[0]
462
463                 avoid_kill = []
464                 temp = []
465                 #Search for the child process of the pid's collected at the first block.
466                 for process in ssh_pids:
467                     temp = self.search_for_child(process, pids, ppid)
468                     avoid_kill = list(set(temp))
469                 
470                 if len(avoid_kill) > 0:
471                     avoid_kill.append(tree_owner) 
472                 ########################
473
474                 import pickle
475                 with open("/tmp/save.proc", "rb") as pickle_file:
476                     pids = pickle.load(pickle_file)
477                 pids_temp = dict()
478                 ps_aux = "ps aux | awk '{print $2,$11}'"
479                 (out, err), proc = self.execute(ps_aux)
480                 if len(out) != 0:
481                     for line in out.strip().split("\n"):
482                         parts = line.strip().split(" ")
483                         pids_temp[parts[0]] = parts[1]
484                     # creates the difference between the machine pids freezed (pickle) and the actual
485                     # adding the avoided pids filtered above (avoid_kill) to allow users keep process
486                     # alive when using besides ssh connections  
487                     kill_pids = set(pids_temp.items()) - set(pids.items())
488                     # py2/py3 : keep it simple
489                     kill_pids = ' '.join(kill_pids)
490
491                     # removing pids from beside connections and its process
492                     kill_pids = kill_pids.split(' ')
493                     kill_pids = list(set(kill_pids) - set(avoid_kill))
494                     kill_pids = ' '.join(kill_pids)
495
496                     cmd = ("killall tcpdump || /bin/true ; " +
497                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
498                            "kill {} || /bin/true ; ".format(kill_pids))
499                 else:
500                     cmd = ("killall tcpdump || /bin/true ; " +
501                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
502             else:
503                 cmd = ("killall tcpdump || /bin/true ; " +
504                        "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
505
506         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
507
508     def search_for_child(self, pid, pids, ppid, family=[]):
509         """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
510         """
511         family.append(pid)
512         for key, value in enumerate(ppid):
513             if value == pid:
514                 child = pids[key]
515                 self.search_for_child(child, pids, ppid)
516         return family
517         
518     def clean_home(self):
519         """ Cleans all NEPI related folders in the Linux host
520         """
521         self.info("Cleaning up home")
522         
523         cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
524               .format(self.home_dir)
525
526         return self.execute(cmd, with_lock = True)
527
528     def clean_experiment(self):
529         """ Cleans all experiment related files in the Linux host.
530         It preserves NEPI files and folders that have a multi experiment
531         scope.
532         """
533         self.info("Cleaning up experiment files")
534         
535         cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
536               .format(self.exp_dir, self.ec.exp_id)
537         
538         return self.execute(cmd, with_lock = True)
539
540     def execute(self, command,
541                 sudo = False,
542                 env = None,
543                 tty = False,
544                 forward_x11 = False,
545                 retry = 3,
546                 connect_timeout = 30,
547                 strict_host_checking = False,
548                 persistent = True,
549                 blocking = True,
550                 with_lock = False
551     ):
552         """ Notice that this invocation will block until the
553         execution finishes. If this is not the desired behavior,
554         use 'run' instead."""
555
556         if self.localhost:
557             (out, err), proc = execfuncs.lexec(
558                 command, 
559                 user = self.get("username"), # still problem with localhost
560                 sudo = sudo,
561                 env = env)
562         else:
563             if with_lock:
564                 # If the execute command is blocking, we don't want to keep
565                 # the node lock. This lock is used to avoid race conditions
566                 # when creating the ControlMaster sockets. A more elegant
567                 # solution is needed.
568                 with self._node_lock:
569                     (out, err), proc = sshfuncs.rexec(
570                         command, 
571                         host = self.get("hostname"),
572                         user = self.get("username"),
573                         port = self.get("port"),
574                         gwuser = self.get("gatewayUser"),
575                         gw = self.get("gateway"),
576                         agent = True,
577                         sudo = sudo,
578                         identity = self.get("identity"),
579                         server_key = self.get("serverKey"),
580                         env = env,
581                         tty = tty,
582                         forward_x11 = forward_x11,
583                         retry = retry,
584                         connect_timeout = connect_timeout,
585                         persistent = persistent,
586                         blocking = blocking, 
587                         strict_host_checking = strict_host_checking
588                     )
589             else:
590                 (out, err), proc = sshfuncs.rexec(
591                     command, 
592                     host = self.get("hostname"),
593                     user = self.get("username"),
594                     port = self.get("port"),
595                     gwuser = self.get("gatewayUser"),
596                     gw = self.get("gateway"),
597                     agent = True,
598                     sudo = sudo,
599                     identity = self.get("identity"),
600                     server_key = self.get("serverKey"),
601                     env = env,
602                     tty = tty,
603                     forward_x11 = forward_x11,
604                     retry = retry,
605                     connect_timeout = connect_timeout,
606                     persistent = persistent,
607                     blocking = blocking, 
608                     strict_host_checking = strict_host_checking
609                 )
610
611         return (out, err), proc
612
613     def run(self, command, home,
614             create_home = False,
615             pidfile = 'pidfile',
616             stdin = None, 
617             stdout = 'stdout', 
618             stderr = 'stderr', 
619             sudo = False,
620             tty = False,
621             strict_host_checking = False):
622         
623         self.debug("Running command '{}'".format(command))
624         
625         if self.localhost:
626             (out, err), proc = execfuncs.lspawn(
627                 command, pidfile,
628                 home = home, 
629                 create_home = create_home, 
630                 stdin = stdin or '/dev/null',
631                 stdout = stdout or '/dev/null',
632                 stderr = stderr or '/dev/null',
633                 sudo = sudo) 
634         else:
635             with self._node_lock:
636                 (out, err), proc = sshfuncs.rspawn(
637                     command,
638                     pidfile = pidfile,
639                     home = home,
640                     create_home = create_home,
641                     stdin = stdin or '/dev/null',
642                     stdout = stdout or '/dev/null',
643                     stderr = stderr or '/dev/null',
644                     sudo = sudo,
645                     host = self.get("hostname"),
646                     user = self.get("username"),
647                     port = self.get("port"),
648                     gwuser = self.get("gatewayUser"),
649                     gw = self.get("gateway"),
650                     agent = True,
651                     identity = self.get("identity"),
652                     server_key = self.get("serverKey"),
653                     tty = tty,
654                     strict_host_checking = strict_host_checking
655                 )
656
657         return (out, err), proc
658
659     def getpid(self, home, pidfile = "pidfile"):
660         if self.localhost:
661             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
662         else:
663             with self._node_lock:
664                 pidtuple = sshfuncs.rgetpid(
665                     os.path.join(home, pidfile),
666                     host = self.get("hostname"),
667                     user = self.get("username"),
668                     port = self.get("port"),
669                     gwuser = self.get("gatewayUser"),
670                     gw = self.get("gateway"),
671                     agent = True,
672                     identity = self.get("identity"),
673                     server_key = self.get("serverKey"),
674                     strict_host_checking = False
675                 )
676         
677         return pidtuple
678
679     def status(self, pid, ppid):
680         if self.localhost:
681             status = execfuncs.lstatus(pid, ppid)
682         else:
683             with self._node_lock:
684                 status = sshfuncs.rstatus(
685                     pid, ppid,
686                     host = self.get("hostname"),
687                     user = self.get("username"),
688                     port = self.get("port"),
689                     gwuser = self.get("gatewayUser"),
690                     gw = self.get("gateway"),
691                     agent = True,
692                     identity = self.get("identity"),
693                     server_key = self.get("serverKey"),
694                     strict_host_checking = False
695                 )
696            
697         return status
698     
699     def kill(self, pid, ppid, sudo = False):
700         out = err = ""
701         proc = None
702         status = self.status(pid, ppid)
703
704         if status == sshfuncs.ProcStatus.RUNNING:
705             if self.localhost:
706                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
707             else:
708                 with self._node_lock:
709                     (out, err), proc = sshfuncs.rkill(
710                         pid, ppid,
711                         host = self.get("hostname"),
712                         user = self.get("username"),
713                         port = self.get("port"),
714                         gwuser = self.get("gatewayUser"),
715                         gw = self.get("gateway"),
716                         agent = True,
717                         sudo = sudo,
718                         identity = self.get("identity"),
719                         server_key = self.get("serverKey"),
720                         strict_host_checking = False
721                     )
722
723         return (out, err), proc
724
725     def copy(self, src, dst):
726         if self.localhost:
727             (out, err), proc = execfuncs.lcopy(
728                 src, dst, 
729                 recursive = True)
730         else:
731             with self._node_lock:
732                 (out, err), proc = sshfuncs.rcopy(
733                     src, dst, 
734                     port = self.get("port"),
735                     gwuser = self.get("gatewayUser"),
736                     gw = self.get("gateway"),
737                     identity = self.get("identity"),
738                     server_key = self.get("serverKey"),
739                     recursive = True,
740                     strict_host_checking = False)
741
742         return (out, err), proc
743
744     def upload(self, src, dst, text = False, overwrite = True,
745                raise_on_error = True):
746         """ Copy content to destination
747
748         src  string with the content to copy. Can be:
749             - plain text
750             - a string with the path to a local file
751             - a string with a semi-colon separeted list of local files
752             - a string with a local directory
753
754         dst  string with destination path on the remote host (remote is 
755             always self.host)
756
757         text src is text input, it must be stored into a temp file before 
758         uploading
759         """
760         # If source is a string input 
761         f = None
762         if text and not os.path.isfile(src):
763             # src is text input that should be uploaded as file
764             # create a temporal file with the content to upload
765             # in python3 we need to open in binary mode if str is bytes
766             mode = 'w' if isinstance(src, str) else 'wb'
767             f = tempfile.NamedTemporaryFile(mode=mode, delete=False)
768             f.write(src)
769             f.close()
770             src = f.name
771
772         # If dst files should not be overwritten, check that the files do not
773         # exist already
774         if isinstance(src, str):
775             src = [s.strip() for s in src.split(";")]
776     
777         if overwrite == False:
778             src = self.filter_existing_files(src, dst)
779             if not src:
780                 return ("", ""), None
781
782         if not self.localhost:
783             # Build destination as <user>@<server>:<path>
784             dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
785
786         ((out, err), proc) = self.copy(src, dst)
787
788         # clean up temp file
789         if f:
790             os.remove(f.name)
791
792         if err:
793             msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
794             self.error(msg, out, err)
795             
796             msg = "{} out: {} err: {}".format(msg, out, err)
797             if raise_on_error:
798                 raise RuntimeError(msg)
799
800         return ((out, err), proc)
801
802     def download(self, src, dst, raise_on_error = True):
803         if not self.localhost:
804             # Build destination as <user>@<server>:<path>
805             src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
806
807         ((out, err), proc) = self.copy(src, dst)
808
809         if err:
810             msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst) 
811             self.error(msg, out, err)
812
813             if raise_on_error:
814                 raise RuntimeError(msg)
815
816         return ((out, err), proc)
817
818     def install_packages_command(self, packages):
819         command = ""
820         if self.use_rpm:
821             command = rpmfuncs.install_packages_command(self.os, packages)
822         elif self.use_deb:
823             command = debfuncs.install_packages_command(self.os, packages)
824         else:
825             msg = "Error installing packages ( OS not known ) "
826             self.error(msg, self.os)
827             raise RuntimeError(msg)
828
829         return command
830
831     def install_packages(self, packages, home,
832                          run_home = None,
833                          raise_on_error = True):
834         """ Install packages in the Linux host.
835
836         'home' is the directory to upload the package installation script.
837         'run_home' is the directory from where to execute the script.
838         """
839         command = self.install_packages_command(packages)
840
841         run_home = run_home or home
842
843         (out, err), proc = self.run_and_wait(command, run_home, 
844                                              shfile = os.path.join(home, "instpkg.sh"),
845                                              pidfile = "instpkg_pidfile",
846                                              ecodefile = "instpkg_exitcode",
847                                              stdout = "instpkg_stdout", 
848                                              stderr = "instpkg_stderr",
849                                              overwrite = False,
850                                              raise_on_error = raise_on_error)
851
852         return (out, err), proc 
853
854     def remove_packages(self, packages, home, run_home = None,
855                         raise_on_error = True):
856         """ Uninstall packages from the Linux host.
857
858         'home' is the directory to upload the package un-installation script.
859         'run_home' is the directory from where to execute the script.
860         """
861         if self.use_rpm:
862             command = rpmfuncs.remove_packages_command(self.os, packages)
863         elif self.use_deb:
864             command = debfuncs.remove_packages_command(self.os, packages)
865         else:
866             msg = "Error removing packages ( OS not known ) "
867             self.error(msg)
868             raise RuntimeError(msg)
869
870         run_home = run_home or home
871
872         (out, err), proc = self.run_and_wait(command, run_home, 
873                                              shfile = os.path.join(home, "rmpkg.sh"),
874                                              pidfile = "rmpkg_pidfile",
875                                              ecodefile = "rmpkg_exitcode",
876                                              stdout = "rmpkg_stdout", 
877                                              stderr = "rmpkg_stderr",
878                                              overwrite = False,
879                                              raise_on_error = raise_on_error)
880         
881         return (out, err), proc 
882
883     def mkdir(self, paths, clean = False):
884         """ Paths is either a single remote directory path to create,
885         or a list of directories to create.
886         """
887         if clean:
888             self.rmdir(paths)
889
890         if isinstance(paths, str):
891             paths = [paths]
892
893         cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
894
895         return self.execute(cmd, with_lock = True)
896
897     def rmdir(self, paths):
898         """ Paths is either a single remote directory path to delete,
899         or a list of directories to delete.
900         """
901
902         if isinstance(paths, str):
903             paths = [paths]
904
905         cmd = " ; ".join(["rm -rf {}".format(path) for path in paths])
906
907         return self.execute(cmd, with_lock = True)
908     
909     def run_and_wait(self, command, home, 
910                      shfile="cmd.sh",
911                      env=None,
912                      overwrite=True,
913                      wait_run=True,
914                      pidfile="pidfile", 
915                      ecodefile="exitcode", 
916                      stdin=None, 
917                      stdout="stdout", 
918                      stderr="stderr", 
919                      sudo=False,
920                      tty=False,
921                      raise_on_error=True):
922         """
923         Uploads the 'command' to a bash script in the host.
924         Then runs the script detached in background in the host, and
925         busy-waites until the script finishes executing.
926         """
927
928         if not shfile.startswith("/"):
929             shfile = os.path.join(home, shfile)
930
931         self.upload_command(command, 
932                             shfile = shfile, 
933                             ecodefile = ecodefile, 
934                             env = env,
935                             overwrite = overwrite)
936
937         command = "bash {}".format(shfile)
938         # run command in background in remote host
939         (out, err), proc = self.run(command, home, 
940                                     pidfile = pidfile,
941                                     stdin = stdin, 
942                                     stdout = stdout, 
943                                     stderr = stderr, 
944                                     sudo = sudo,
945                                     tty = tty)
946
947         # check no errors occurred
948         if proc.poll():
949             msg = " Failed to run command '{}' ".format(command)
950             self.error(msg, out, err)
951             if raise_on_error:
952                 raise RuntimeError(msg)
953
954         # Wait for pid file to be generated
955         pid, ppid = self.wait_pid(
956             home = home, 
957             pidfile = pidfile, 
958             raise_on_error = raise_on_error)
959
960         if wait_run:
961             # wait until command finishes to execute
962             self.wait_run(pid, ppid)
963             
964             (eout, err), proc = self.check_errors(home,
965                                                   ecodefile = ecodefile,
966                                                   stderr = stderr)
967
968             # Out is what was written in the stderr file
969             if err:
970                 msg = " Failed to run command '{}' ".format(command)
971                 self.error(msg, eout, err)
972
973                 if raise_on_error:
974                     raise RuntimeError(msg)
975
976         (out, oerr), proc = self.check_output(home, stdout)
977         
978         return (out, err), proc
979         
980     def exitcode(self, home, ecodefile = "exitcode"):
981         """
982         Get the exit code of an application.
983         Returns an integer value with the exit code 
984         """
985         (out, err), proc = self.check_output(home, ecodefile)
986
987         # Succeeded to open file, return exit code in the file
988         if proc.wait() == 0:
989             try:
990                 return int(out.strip())
991             except:
992                 # Error in the content of the file!
993                 return ExitCode.CORRUPTFILE
994
995         # No such file or directory
996         if proc.returncode == 1:
997             return ExitCode.FILENOTFOUND
998         
999         # Other error from 'cat'
1000         return ExitCode.ERROR
1001
1002     def upload_command(self, command, 
1003                        shfile="cmd.sh",
1004                        ecodefile="exitcode",
1005                        overwrite=True,
1006                        env=None):
1007         """ Saves the command as a bash script file in the remote host, and
1008         forces to save the exit code of the command execution to the ecodefile
1009         """
1010
1011         if not (command.strip().endswith(";") or command.strip().endswith("&")):
1012             command += ";"
1013             
1014         # The exit code of the command will be stored in ecodefile
1015         command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1016                   .format(command=command, ecodefile=ecodefile)
1017
1018         # Export environment
1019         environ = self.format_environment(env)
1020
1021         # Add environ to command
1022         command = environ + command
1023
1024         return self.upload(command, shfile, text=True, overwrite=overwrite)
1025
1026     def format_environment(self, env, inline=False):
1027         """ Formats the environment variables for a command to be executed
1028         either as an inline command
1029         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
1030         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1031         """
1032         if not env: return ""
1033
1034         # Remove extra white spaces
1035         env = re.sub(r'\s+', ' ', env.strip())
1036
1037         sep = ";" if inline else "\n"
1038         return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep 
1039
1040     def check_errors(self, home, 
1041                      ecodefile = "exitcode", 
1042                      stderr = "stderr"):
1043         """ Checks whether errors occurred while running a command.
1044         It first checks the exit code for the command, and only if the
1045         exit code is an error one it returns the error output.
1046
1047         """
1048         proc = None
1049         err = ""
1050
1051         # get exit code saved in the 'exitcode' file
1052         ecode = self.exitcode(home, ecodefile)
1053
1054         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1055             err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1056         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1057             # The process returned an error code or didn't exist. 
1058             # Check standard error.
1059             (err, eerr), proc = self.check_output(home, stderr)
1060
1061             # If the stderr file was not found, assume nothing bad happened,
1062             # and just ignore the error.
1063             # (cat returns 1 for error "No such file or directory")
1064             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
1065                 err = "" 
1066                 
1067         return ("", err), proc
1068     
1069     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1070         """ Waits until the pid file for the command is generated, 
1071             and returns the pid and ppid of the process """
1072         pid = ppid = None
1073         delay = 1.0
1074
1075         for i in range(2):
1076             pidtuple = self.getpid(home = home, pidfile = pidfile)
1077             
1078             if pidtuple:
1079                 pid, ppid = pidtuple
1080                 break
1081             else:
1082                 time.sleep(delay)
1083                 delay = delay * 1.5
1084         else:
1085             msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1086             self.error(msg)
1087     
1088             if raise_on_error:
1089                 raise RuntimeError(msg)
1090
1091         return pid, ppid
1092
1093     def wait_run(self, pid, ppid, trial = 0):
1094         """ wait for a remote process to finish execution """
1095         delay = 1.0
1096
1097         while True:
1098             status = self.status(pid, ppid)
1099             
1100             if status is ProcStatus.FINISHED:
1101                 break
1102             elif status is not ProcStatus.RUNNING:
1103                 delay = delay * 1.5
1104                 time.sleep(delay)
1105                 # If it takes more than 20 seconds to start, then
1106                 # asume something went wrong
1107                 if delay > 20:
1108                     break
1109             else:
1110                 # The app is running, just wait...
1111                 time.sleep(0.5)
1112
1113     def check_output(self, home, filename):
1114         """ Retrives content of file """
1115         (out, err), proc = self.execute(
1116             "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1117         return (out, err), proc
1118
1119     def is_alive(self):
1120         """ Checks if host is responsive
1121         """
1122         if self.localhost:
1123             return True
1124
1125         out = err = ""
1126         msg = "Unresponsive host. Wrong answer. "
1127
1128         # The underlying SSH layer will sometimes return an empty
1129         # output (even if the command was executed without errors).
1130         # To work arround this, repeat the operation N times or
1131         # until the result is not empty string
1132         try:
1133             (out, err), proc = self.execute("echo 'ALIVE'",
1134                                             blocking = True,
1135                                             with_lock = True)
1136             
1137             if out.find("ALIVE") > -1:
1138                 return True
1139         except:
1140             trace = traceback.format_exc()
1141             msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1142
1143         self.error(msg, out, err)
1144         return False
1145
1146     def find_home(self):
1147         """ 
1148         Retrieves host home directory
1149         """
1150         # The underlying SSH layer will sometimes return an empty
1151         # output (even if the command was executed without errors).
1152         # To work arround this, repeat the operation N times or
1153         # until the result is not empty string
1154         msg = "Impossible to retrieve HOME directory"
1155         try:
1156             (out, err), proc = self.execute("echo ${HOME}",
1157                                             blocking = True,
1158                                             with_lock = True)
1159             
1160             if out.strip() != "":
1161                 self._home_dir =  out.strip()
1162         except:
1163             trace = traceback.format_exc()
1164             msg = "Impossible to retrieve HOME directory {}".format(trace)
1165
1166         if not self._home_dir:
1167             self.error(msg)
1168             raise RuntimeError(msg)
1169
1170     def filter_existing_files(self, src, dst):
1171         """ Removes files that already exist in the Linux host from src list
1172         """
1173         # construct a dictionary with { dst: src }
1174         dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1175                 if len(src) > 1 else {dst: src[0]}
1176
1177         command = []
1178         for d in dests:
1179             command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1180
1181         command = ";".join(command)
1182
1183         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1184         
1185         # avoid RuntimeError that would result from
1186         # changing loop subject during iteration
1187         keys = list(dests.keys())
1188         for d in keys:
1189             if out.find(d) > -1:
1190                 del dests[d]
1191
1192         if not dests:
1193             return []
1194
1195         retcod = dests.values()
1196         if PY3: retcod = list(retcod)
1197         return retcod