56cc7152f71c84eb2f786ae2fc9fb3a6bb67eb97
[nepi.git] / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy, 
21                                      ResourceState)
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33 import traceback
34
35 from six import PY3
36
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!! 
39
40 class ExitCode:
41     """
42     Error codes that the rexitcode function can return if unable to
43     check the exit code of a spawned process
44     """
45     FILENOTFOUND = -1
46     CORRUPTFILE = -2
47     ERROR = -3
48     OK = 0
49
50 class OSType:
51     """
52     Supported flavors of Linux OS
53     """
54     DEBIAN = 1 
55     UBUNTU = 1 << 1 
56     FEDORA = 1 << 2
57     FEDORA_8 = 1 << 3 | FEDORA 
58     FEDORA_12 = 1 << 4 | FEDORA 
59     FEDORA_14 = 1 << 5 | FEDORA 
60
61 @clsinit_copy
62 class LinuxNode(ResourceManager):
63     """
64     .. class:: Class Args :
65       
66         :param ec: The Experiment controller
67         :type ec: ExperimentController
68         :param guid: guid of the RM
69         :type guid: int
70
71     .. note::
72
73         There are different ways in which commands can be executed using the
74         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
75         'run_and_wait'). 
76         
77         Brief explanation:
78
79             * 'execute' (blocking mode) :  
80
81                      HOW IT WORKS: 'execute', forks a process and run the
82                      command, synchronously, attached to the terminal, in
83                      foreground.
84                      The execute method will block until the command returns
85                      the result on 'out', 'err' (so until it finishes executing).
86   
87                      USAGE: short-lived commands that must be executed attached
88                      to a terminal and in foreground, for which it IS necessary
89                      to block until the command has finished (e.g. if you want
90                      to run 'ls' or 'cat').
91
92             * 'execute' (NON blocking mode - blocking = False) :
93
94                     HOW IT WORKS: Same as before, except that execute method
95                     will return immediately (even if command still running).
96
97                     USAGE: long-lived commands that must be executed attached
98                     to a terminal and in foreground, but for which it is not
99                     necessary to block until the command has finished. (e.g.
100                     start an application using X11 forwarding)
101
102              * 'run' :
103
104                    HOW IT WORKS: Connects to the host ( using SSH if remote)
105                    and launches the command in background, detached from any
106                    terminal (daemonized), and returns. The command continues to
107                    run remotely, but since it is detached from the terminal,
108                    its pipes (stdin, stdout, stderr) can't be redirected to the
109                    console (as normal non detached processes would), and so they
110                    are explicitly redirected to files. The pidfile is created as
111                    part of the process of launching the command. The pidfile
112                    holds the pid and ppid of the process forked in background,
113                    so later on it is possible to check whether the command is still
114                    running.
115
116                     USAGE: long-lived commands that can run detached in background,
117                     for which it is NOT necessary to block (wait) until the command
118                     has finished. (e.g. start an application that is not using X11
119                     forwarding. It can run detached and remotely in background)
120
121              * 'run_and_wait' :
122
123                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
124                     the command has finished execution. It also checks whether
125                     errors occurred during runtime by reading the exitcode file,
126                     which contains the exit code of the command that was run
127                     (checking stderr only is not always reliable since many
128                     commands throw debugging info to stderr and the only way to
129                     automatically know whether an error really happened is to
130                     check the process exit code).
131
132                     Another difference with respect to 'run', is that instead
133                     of directly executing the command as a bash command line,
134                     it uploads the command to a bash script and runs the script.
135                     This allows to use the bash script to debug errors, since
136                     it remains at the remote host and can be run manually to
137                     reproduce the error.
138                   
139                     USAGE: medium-lived commands that can run detached in
140                     background, for which it IS necessary to block (wait) until
141                     the command has finished. (e.g. Package installation,
142                     source compilation, file download, etc)
143
144     """
145     _rtype = "linux::Node"
146     _help = "Controls Linux host machines ( either localhost or a host " \
147             "that can be accessed using a SSH key)"
148     _platform = "linux"
149
150     @classmethod
151     def _register_attributes(cls):
152         cls._register_attribute(Attribute(
153             "hostname", "Hostname of the machine",
154             flags = Flags.Design))
155
156         cls._register_attribute(Attribute(
157             "username", "Local account username", 
158             flags = Flags.Credential))
159
160         cls._register_attribute(Attribute(
161             "port", "SSH port",
162             flags = Flags.Design))
163         
164         cls._register_attribute(Attribute(
165             "home",
166             "Experiment home directory to store all experiment related files",
167             flags = Flags.Design))
168         
169         cls._register_attribute(Attribute(
170             "identity", "SSH identity file",
171             flags = Flags.Credential))
172         
173         cls._register_attribute(Attribute(
174             "serverKey", "Server public key", 
175             flags = Flags.Design))
176         
177         cls._register_attribute(Attribute(
178             "cleanHome",
179             "Remove all nepi files and directories "
180             " from node home folder before starting experiment", 
181             type = Types.Bool,
182             default = False,
183             flags = Flags.Design))
184
185         cls._register_attribute(Attribute(
186             "cleanExperiment", "Remove all files and directories " 
187             " from a previous same experiment, before the new experiment starts", 
188             type = Types.Bool,
189             default = False,
190             flags = Flags.Design))
191         
192         cls._register_attribute(Attribute(
193             "cleanProcesses", 
194             "Kill all running processes before starting experiment",
195             type = Types.Bool,
196             default = False,
197             flags = Flags.Design))
198         
199         cls._register_attribute(Attribute(
200             "cleanProcessesAfter", 
201             """Kill all running processes after starting experiment
202             This might be dangerous when using user root""",
203             type = Types.Bool,
204             default = True,
205             flags = Flags.Design))
206         
207         cls._register_attribute(Attribute(
208             "tearDown",
209             "Bash script to be executed before releasing the resource",
210             flags = Flags.Design))
211
212         cls._register_attribute(Attribute(
213             "gatewayUser",
214             "Gateway account username",
215             flags = Flags.Design))
216
217         cls._register_attribute(Attribute(
218             "gateway",
219             "Hostname of the gateway machine",
220             flags = Flags.Design))
221
222         cls._register_attribute(Attribute(
223             "ip",
224             "Linux host public IP address. "
225             "Must not be modified by the user unless hostname is 'localhost'",
226             flags = Flags.Design))
227
228     def __init__(self, ec, guid):
229         super(LinuxNode, self).__init__(ec, guid)
230         self._os = None
231         # home directory at Linux host
232         self._home_dir = ""
233
234         # lock to prevent concurrent applications on the same node,
235         # to execute commands at the same time. There are potential
236         # concurrency issues when using SSH to a same host from 
237         # multiple threads. There are also possible operational 
238         # issues, e.g. an application querying the existence 
239         # of a file or folder prior to its creation, and another 
240         # application creating the same file or folder in between.
241         self._node_lock = threading.Lock()
242         
243     def log_message(self, msg):
244         return " guid {} - host {} - {} "\
245             .format(self.guid, self.get("hostname"), msg)
246
247     @property
248     def home_dir(self):
249         home = self.get("home") or ""
250         if not home.startswith("/"):
251             home = os.path.join(self._home_dir, home) 
252             return home
253
254     @property
255     def nepi_home(self):
256         return os.path.join(self.home_dir, ".nepi")
257
258     @property
259     def usr_dir(self):
260         return os.path.join(self.nepi_home, "nepi-usr")
261
262     @property
263     def lib_dir(self):
264         return os.path.join(self.usr_dir, "lib")
265
266     @property
267     def bin_dir(self):
268         return os.path.join(self.usr_dir, "bin")
269
270     @property
271     def src_dir(self):
272         return os.path.join(self.usr_dir, "src")
273
274     @property
275     def share_dir(self):
276         return os.path.join(self.usr_dir, "share")
277
278     @property
279     def exp_dir(self):
280         return os.path.join(self.nepi_home, "nepi-exp")
281
282     @property
283     def exp_home(self):
284         return os.path.join(self.exp_dir, self.ec.exp_id)
285
286     @property
287     def node_home(self):
288         return os.path.join(self.exp_home, "node-{}".format(self.guid))
289
290     @property
291     def run_home(self):
292         return os.path.join(self.node_home, self.ec.run_id)
293
294     @property
295     def os(self):
296         if self._os:
297             return self._os
298
299         if not self.localhost and not self.get("username"):
300             msg = "Can't resolve OS, insufficient data "
301             self.error(msg)
302             raise RuntimeError(msg)
303
304         out = self.get_os()
305
306         if out.find("Debian") == 0: 
307             self._os = OSType.DEBIAN
308         elif out.find("Ubuntu") ==0:
309             self._os = OSType.UBUNTU
310         elif out.find("Fedora release") == 0:
311             self._os = OSType.FEDORA
312             if out.find("Fedora release 8") == 0:
313                 self._os = OSType.FEDORA_8
314             elif out.find("Fedora release 12") == 0:
315                 self._os = OSType.FEDORA_12
316             elif out.find("Fedora release 14") == 0:
317                 self._os = OSType.FEDORA_14
318         else:
319             msg = "Unsupported OS"
320             self.error(msg, out)
321             raise RuntimeError("{} - {} ".format(msg, out))
322
323         return self._os
324
325     def get_os(self):
326         # The underlying SSH layer will sometimes return an empty
327         # output (even if the command was executed without errors).
328         # To work arround this, repeat the operation N times or
329         # until the result is not empty string
330         out = ""
331         try:
332             (out, err), proc = self.execute("cat /etc/issue", 
333                                             with_lock = True,
334                                             blocking = True)
335         except:
336             trace = traceback.format_exc()
337             msg = "Error detecting OS: {} ".format(trace)
338             self.error(msg, out, err)
339     
340         return out
341
342     @property
343     def use_deb(self):
344         return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
345
346     @property
347     def use_rpm(self):
348         return (self.os & OSType.FEDORA)
349
350     @property
351     def localhost(self):
352         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
353
354     def do_provision(self):
355         # check if host is alive
356         if not self.is_alive():
357             trace = traceback.format_exc()
358             msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
359             self.error(msg)
360             raise RuntimeError(msg)
361
362         self.find_home()
363
364         if self.get("cleanProcesses"):
365             self.clean_processes()
366
367         if self.get("cleanHome"):
368             self.clean_home()
369             
370         if self.get("cleanExperiment"):
371             self.clean_experiment()
372             
373         # Create shared directory structure and node home directory
374         paths = [self.lib_dir, 
375                  self.bin_dir, 
376                  self.src_dir, 
377                  self.share_dir, 
378                  self.node_home]
379
380         self.mkdir(paths)
381
382         # Get Public IP address if possible
383         if not self.get("ip"):
384             try:
385                 ip = sshfuncs.gethostbyname(self.get("hostname"))
386                 self.set("ip", ip)
387             except:
388                 if self.get("gateway") is None:
389                     msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
390                     self.error(msg)
391
392         super(LinuxNode, self).do_provision()
393
394     def do_deploy(self):
395         if self.state == ResourceState.NEW:
396             self.info("Deploying node")
397             self.do_discover()
398             self.do_provision()
399
400         # Node needs to wait until all associated interfaces are 
401         # ready before it can finalize deployment
402         from nepi.resources.linux.interface import LinuxInterface
403         ifaces = self.get_connected(LinuxInterface.get_rtype())
404         for iface in ifaces:
405             if iface.state < ResourceState.READY:
406                 self.ec.schedule(self.reschedule_delay, self.deploy)
407                 return 
408
409         super(LinuxNode, self).do_deploy()
410
411     def do_release(self):
412         rms = self.get_connected()
413         for rm in rms:
414             # Node needs to wait until all associated RMs are released
415             # before it can be released
416             if rm.state != ResourceState.RELEASED:
417                 self.ec.schedule(self.reschedule_delay, self.release)
418                 return 
419
420         tear_down = self.get("tearDown")
421         if tear_down:
422             self.execute(tear_down)
423
424         if self.get("cleanProcessesAfter"):
425             self.clean_processes()
426
427         super(LinuxNode, self).do_release()
428
429     def valid_connection(self, guid):
430         # TODO: Validate!
431         return True
432
433     def clean_processes(self):
434         self.info("Cleaning up processes")
435
436         if self.localhost:
437             return 
438         
439         if self.get("username") != 'root':
440             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
441                    "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
442                    "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
443         else:
444             if self.state >= ResourceState.READY:
445                 ########################
446                 #Collect all process (must change for a more intelligent way)
447                 ppid = []
448                 pids = []
449                 avoid_pids = "ps axjf | awk '{print $1,$2}'"
450                 (out, err), proc = self.execute(avoid_pids)
451                 if len(out) != 0:
452                     for line in out.strip().split("\n"):
453                         parts = line.strip().split(" ")
454                         ppid.append(parts[0])
455                         pids.append(parts[1])
456
457                 #Collect all process below ssh -D
458                 tree_owner = 0
459                 ssh_pids = []
460                 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
461                 (out, err), proc = self.execute(sshs)
462                 if len(out) != 0:
463                     for line in out.strip().split("\n"):
464                         parts = line.strip().split(" ")
465                         if parts[1].startswith('root@pts'):
466                             ssh_pids.append(parts[0])
467                         elif parts[1] == "-D":
468                             tree_owner = parts[0]
469
470                 avoid_kill = []
471                 temp = []
472                 #Search for the child process of the pid's collected at the first block.
473                 for process in ssh_pids:
474                     temp = self.search_for_child(process, pids, ppid)
475                     avoid_kill = list(set(temp))
476                 
477                 if len(avoid_kill) > 0:
478                     avoid_kill.append(tree_owner) 
479                 ########################
480
481                 import pickle
482                 with open("/tmp/save.proc", "rb") as pickle_file:
483                     pids = pickle.load(pickle_file)
484                 pids_temp = dict()
485                 ps_aux = "ps aux | awk '{print $2,$11}'"
486                 (out, err), proc = self.execute(ps_aux)
487                 if len(out) != 0:
488                     for line in out.strip().split("\n"):
489                         parts = line.strip().split(" ")
490                         pids_temp[parts[0]] = parts[1]
491                     # creates the difference between the machine pids freezed (pickle) and the actual
492                     # adding the avoided pids filtered above (avoid_kill) to allow users keep process
493                     # alive when using besides ssh connections  
494                     kill_pids = set(pids_temp.items()) - set(pids.items())
495                     # py2/py3 : keep it simple
496                     kill_pids = ' '.join(kill_pids)
497
498                     # removing pids from beside connections and its process
499                     kill_pids = kill_pids.split(' ')
500                     kill_pids = list(set(kill_pids) - set(avoid_kill))
501                     kill_pids = ' '.join(kill_pids)
502
503                     cmd = ("killall tcpdump || /bin/true ; " +
504                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
505                            "kill {} || /bin/true ; ".format(kill_pids))
506                 else:
507                     cmd = ("killall tcpdump || /bin/true ; " +
508                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
509             else:
510                 cmd = ("killall tcpdump || /bin/true ; " +
511                        "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
512
513         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
514
515     def search_for_child(self, pid, pids, ppid, family=[]):
516         """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
517         """
518         family.append(pid)
519         for key, value in enumerate(ppid):
520             if value == pid:
521                 child = pids[key]
522                 self.search_for_child(child, pids, ppid)
523         return family
524         
525     def clean_home(self):
526         """ Cleans all NEPI related folders in the Linux host
527         """
528         self.info("Cleaning up home")
529         
530         cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
531               .format(self.home_dir)
532
533         return self.execute(cmd, with_lock = True)
534
535     def clean_experiment(self):
536         """ Cleans all experiment related files in the Linux host.
537         It preserves NEPI files and folders that have a multi experiment
538         scope.
539         """
540         self.info("Cleaning up experiment files")
541         
542         cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
543               .format(self.exp_dir, self.ec.exp_id)
544         
545         return self.execute(cmd, with_lock = True)
546
547     def execute(self, command,
548                 sudo = False,
549                 env = None,
550                 tty = False,
551                 forward_x11 = False,
552                 retry = 3,
553                 connect_timeout = 30,
554                 strict_host_checking = False,
555                 persistent = True,
556                 blocking = True,
557                 with_lock = False
558     ):
559         """ Notice that this invocation will block until the
560         execution finishes. If this is not the desired behavior,
561         use 'run' instead."""
562
563         if self.localhost:
564             (out, err), proc = execfuncs.lexec(
565                 command, 
566                 user = self.get("username"), # still problem with localhost
567                 sudo = sudo,
568                 env = env)
569         else:
570             if with_lock:
571                 # If the execute command is blocking, we don't want to keep
572                 # the node lock. This lock is used to avoid race conditions
573                 # when creating the ControlMaster sockets. A more elegant
574                 # solution is needed.
575                 with self._node_lock:
576                     (out, err), proc = sshfuncs.rexec(
577                         command, 
578                         host = self.get("hostname"),
579                         user = self.get("username"),
580                         port = self.get("port"),
581                         gwuser = self.get("gatewayUser"),
582                         gw = self.get("gateway"),
583                         agent = True,
584                         sudo = sudo,
585                         identity = self.get("identity"),
586                         server_key = self.get("serverKey"),
587                         env = env,
588                         tty = tty,
589                         forward_x11 = forward_x11,
590                         retry = retry,
591                         connect_timeout = connect_timeout,
592                         persistent = persistent,
593                         blocking = blocking, 
594                         strict_host_checking = strict_host_checking
595                     )
596             else:
597                 (out, err), proc = sshfuncs.rexec(
598                     command, 
599                     host = self.get("hostname"),
600                     user = self.get("username"),
601                     port = self.get("port"),
602                     gwuser = self.get("gatewayUser"),
603                     gw = self.get("gateway"),
604                     agent = True,
605                     sudo = sudo,
606                     identity = self.get("identity"),
607                     server_key = self.get("serverKey"),
608                     env = env,
609                     tty = tty,
610                     forward_x11 = forward_x11,
611                     retry = retry,
612                     connect_timeout = connect_timeout,
613                     persistent = persistent,
614                     blocking = blocking, 
615                     strict_host_checking = strict_host_checking
616                 )
617
618         return (out, err), proc
619
620     def run(self, command, home,
621             create_home = False,
622             pidfile = 'pidfile',
623             stdin = None, 
624             stdout = 'stdout', 
625             stderr = 'stderr', 
626             sudo = False,
627             tty = False,
628             strict_host_checking = False):
629         
630         self.debug("Running command '{}'".format(command))
631         
632         if self.localhost:
633             (out, err), proc = execfuncs.lspawn(
634                 command, pidfile,
635                 home = home, 
636                 create_home = create_home, 
637                 stdin = stdin or '/dev/null',
638                 stdout = stdout or '/dev/null',
639                 stderr = stderr or '/dev/null',
640                 sudo = sudo) 
641         else:
642             with self._node_lock:
643                 (out, err), proc = sshfuncs.rspawn(
644                     command,
645                     pidfile = pidfile,
646                     home = home,
647                     create_home = create_home,
648                     stdin = stdin or '/dev/null',
649                     stdout = stdout or '/dev/null',
650                     stderr = stderr or '/dev/null',
651                     sudo = sudo,
652                     host = self.get("hostname"),
653                     user = self.get("username"),
654                     port = self.get("port"),
655                     gwuser = self.get("gatewayUser"),
656                     gw = self.get("gateway"),
657                     agent = True,
658                     identity = self.get("identity"),
659                     server_key = self.get("serverKey"),
660                     tty = tty,
661                     strict_host_checking = strict_host_checking
662                 )
663
664         return (out, err), proc
665
666     def getpid(self, home, pidfile = "pidfile"):
667         if self.localhost:
668             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
669         else:
670             with self._node_lock:
671                 pidtuple = sshfuncs.rgetpid(
672                     os.path.join(home, pidfile),
673                     host = self.get("hostname"),
674                     user = self.get("username"),
675                     port = self.get("port"),
676                     gwuser = self.get("gatewayUser"),
677                     gw = self.get("gateway"),
678                     agent = True,
679                     identity = self.get("identity"),
680                     server_key = self.get("serverKey"),
681                     strict_host_checking = False
682                 )
683         
684         return pidtuple
685
686     def status(self, pid, ppid):
687         if self.localhost:
688             status = execfuncs.lstatus(pid, ppid)
689         else:
690             with self._node_lock:
691                 status = sshfuncs.rstatus(
692                     pid, ppid,
693                     host = self.get("hostname"),
694                     user = self.get("username"),
695                     port = self.get("port"),
696                     gwuser = self.get("gatewayUser"),
697                     gw = self.get("gateway"),
698                     agent = True,
699                     identity = self.get("identity"),
700                     server_key = self.get("serverKey"),
701                     strict_host_checking = False
702                 )
703            
704         return status
705     
706     def kill(self, pid, ppid, sudo = False):
707         out = err = ""
708         proc = None
709         status = self.status(pid, ppid)
710
711         if status == sshfuncs.ProcStatus.RUNNING:
712             if self.localhost:
713                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
714             else:
715                 with self._node_lock:
716                     (out, err), proc = sshfuncs.rkill(
717                         pid, ppid,
718                         host = self.get("hostname"),
719                         user = self.get("username"),
720                         port = self.get("port"),
721                         gwuser = self.get("gatewayUser"),
722                         gw = self.get("gateway"),
723                         agent = True,
724                         sudo = sudo,
725                         identity = self.get("identity"),
726                         server_key = self.get("serverKey"),
727                         strict_host_checking = False
728                     )
729
730         return (out, err), proc
731
732     def copy(self, src, dst):
733         if self.localhost:
734             (out, err), proc = execfuncs.lcopy(
735                 src, dst, 
736                 recursive = True)
737         else:
738             with self._node_lock:
739                 (out, err), proc = sshfuncs.rcopy(
740                     src, dst, 
741                     port = self.get("port"),
742                     gwuser = self.get("gatewayUser"),
743                     gw = self.get("gateway"),
744                     identity = self.get("identity"),
745                     server_key = self.get("serverKey"),
746                     recursive = True,
747                     strict_host_checking = False)
748
749         return (out, err), proc
750
751     def upload(self, src, dst, text = False, overwrite = True,
752                raise_on_error = True):
753         """ Copy content to destination
754
755         src  string with the content to copy. Can be:
756             - plain text
757             - a string with the path to a local file
758             - a string with a semi-colon separeted list of local files
759             - a string with a local directory
760
761         dst  string with destination path on the remote host (remote is 
762             always self.host)
763
764         text src is text input, it must be stored into a temp file before 
765         uploading
766         """
767         # If source is a string input 
768         f = None
769         if text and not os.path.isfile(src):
770             # src is text input that should be uploaded as file
771             # create a temporal file with the content to upload
772             # in python3 we need to open in binary mode if str is bytes
773             mode = 'w' if isinstance(src, str) else 'wb'
774             f = tempfile.NamedTemporaryFile(mode=mode, delete=False)
775             f.write(src)
776             f.close()
777             src = f.name
778
779         # If dst files should not be overwritten, check that the files do not
780         # exist already
781         if isinstance(src, str):
782             src = [s.strip() for s in src.split(";")]
783     
784         if overwrite == False:
785             src = self.filter_existing_files(src, dst)
786             if not src:
787                 return ("", ""), None
788
789         if not self.localhost:
790             # Build destination as <user>@<server>:<path>
791             dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
792
793         ((out, err), proc) = self.copy(src, dst)
794
795         # clean up temp file
796         if f:
797             os.remove(f.name)
798
799         if err:
800             msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
801             self.error(msg, out, err)
802             
803             msg = "{} out: {} err: {}".format(msg, out, err)
804             if raise_on_error:
805                 raise RuntimeError(msg)
806
807         return ((out, err), proc)
808
809     def download(self, src, dst, raise_on_error = True):
810         if not self.localhost:
811             # Build destination as <user>@<server>:<path>
812             src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
813
814         ((out, err), proc) = self.copy(src, dst)
815
816         if err:
817             msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst) 
818             self.error(msg, out, err)
819
820             if raise_on_error:
821                 raise RuntimeError(msg)
822
823         return ((out, err), proc)
824
825     def install_packages_command(self, packages):
826         command = ""
827         if self.use_rpm:
828             command = rpmfuncs.install_packages_command(self.os, packages)
829         elif self.use_deb:
830             command = debfuncs.install_packages_command(self.os, packages)
831         else:
832             msg = "Error installing packages ( OS not known ) "
833             self.error(msg, self.os)
834             raise RuntimeError(msg)
835
836         return command
837
838     def install_packages(self, packages, home,
839                          run_home = None,
840                          raise_on_error = True):
841         """ Install packages in the Linux host.
842
843         'home' is the directory to upload the package installation script.
844         'run_home' is the directory from where to execute the script.
845         """
846         command = self.install_packages_command(packages)
847
848         run_home = run_home or home
849
850         (out, err), proc = self.run_and_wait(command, run_home, 
851                                              shfile = os.path.join(home, "instpkg.sh"),
852                                              pidfile = "instpkg_pidfile",
853                                              ecodefile = "instpkg_exitcode",
854                                              stdout = "instpkg_stdout", 
855                                              stderr = "instpkg_stderr",
856                                              overwrite = False,
857                                              raise_on_error = raise_on_error)
858
859         return (out, err), proc 
860
861     def remove_packages(self, packages, home, run_home = None,
862                         raise_on_error = True):
863         """ Uninstall packages from the Linux host.
864
865         'home' is the directory to upload the package un-installation script.
866         'run_home' is the directory from where to execute the script.
867         """
868         if self.use_rpm:
869             command = rpmfuncs.remove_packages_command(self.os, packages)
870         elif self.use_deb:
871             command = debfuncs.remove_packages_command(self.os, packages)
872         else:
873             msg = "Error removing packages ( OS not known ) "
874             self.error(msg)
875             raise RuntimeError(msg)
876
877         run_home = run_home or home
878
879         (out, err), proc = self.run_and_wait(command, run_home, 
880                                              shfile = os.path.join(home, "rmpkg.sh"),
881                                              pidfile = "rmpkg_pidfile",
882                                              ecodefile = "rmpkg_exitcode",
883                                              stdout = "rmpkg_stdout", 
884                                              stderr = "rmpkg_stderr",
885                                              overwrite = False,
886                                              raise_on_error = raise_on_error)
887         
888         return (out, err), proc 
889
890     def mkdir(self, paths, clean = False):
891         """ Paths is either a single remote directory path to create,
892         or a list of directories to create.
893         """
894         if clean:
895             self.rmdir(paths)
896
897         if isinstance(paths, str):
898             paths = [paths]
899
900         cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
901
902         return self.execute(cmd, with_lock = True)
903
904     def rmdir(self, paths):
905         """ Paths is either a single remote directory path to delete,
906         or a list of directories to delete.
907         """
908
909         if isinstance(paths, str):
910             paths = [paths]
911
912         cmd = " ; ".join(["rm -rf {}".format(path) for path in paths])
913
914         return self.execute(cmd, with_lock = True)
915     
916     def run_and_wait(self, command, home, 
917                      shfile="cmd.sh",
918                      env=None,
919                      overwrite=True,
920                      wait_run=True,
921                      pidfile="pidfile", 
922                      ecodefile="exitcode", 
923                      stdin=None, 
924                      stdout="stdout", 
925                      stderr="stderr", 
926                      sudo=False,
927                      tty=False,
928                      raise_on_error=True):
929         """
930         Uploads the 'command' to a bash script in the host.
931         Then runs the script detached in background in the host, and
932         busy-waites until the script finishes executing.
933         """
934
935         if not shfile.startswith("/"):
936             shfile = os.path.join(home, shfile)
937
938         self.upload_command(command, 
939                             shfile = shfile, 
940                             ecodefile = ecodefile, 
941                             env = env,
942                             overwrite = overwrite)
943
944         command = "bash {}".format(shfile)
945         # run command in background in remote host
946         (out, err), proc = self.run(command, home, 
947                                     pidfile = pidfile,
948                                     stdin = stdin, 
949                                     stdout = stdout, 
950                                     stderr = stderr, 
951                                     sudo = sudo,
952                                     tty = tty)
953
954         # check no errors occurred
955         if proc.poll():
956             msg = " Failed to run command '{}' ".format(command)
957             self.error(msg, out, err)
958             if raise_on_error:
959                 raise RuntimeError(msg)
960
961         # Wait for pid file to be generated
962         pid, ppid = self.wait_pid(
963             home = home, 
964             pidfile = pidfile, 
965             raise_on_error = raise_on_error)
966
967         if wait_run:
968             # wait until command finishes to execute
969             self.wait_run(pid, ppid)
970             
971             (eout, err), proc = self.check_errors(home,
972                                                   ecodefile = ecodefile,
973                                                   stderr = stderr)
974
975             # Out is what was written in the stderr file
976             if err:
977                 msg = " Failed to run command '{}' ".format(command)
978                 self.error(msg, eout, err)
979
980                 if raise_on_error:
981                     raise RuntimeError(msg)
982
983         (out, oerr), proc = self.check_output(home, stdout)
984         
985         return (out, err), proc
986         
987     def exitcode(self, home, ecodefile = "exitcode"):
988         """
989         Get the exit code of an application.
990         Returns an integer value with the exit code 
991         """
992         (out, err), proc = self.check_output(home, ecodefile)
993
994         # Succeeded to open file, return exit code in the file
995         if proc.wait() == 0:
996             try:
997                 return int(out.strip())
998             except:
999                 # Error in the content of the file!
1000                 return ExitCode.CORRUPTFILE
1001
1002         # No such file or directory
1003         if proc.returncode == 1:
1004             return ExitCode.FILENOTFOUND
1005         
1006         # Other error from 'cat'
1007         return ExitCode.ERROR
1008
1009     def upload_command(self, command, 
1010                        shfile="cmd.sh",
1011                        ecodefile="exitcode",
1012                        overwrite=True,
1013                        env=None):
1014         """ Saves the command as a bash script file in the remote host, and
1015         forces to save the exit code of the command execution to the ecodefile
1016         """
1017
1018         if not (command.strip().endswith(";") or command.strip().endswith("&")):
1019             command += ";"
1020             
1021         # The exit code of the command will be stored in ecodefile
1022         command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1023                   .format(command=command, ecodefile=ecodefile)
1024
1025         # Export environment
1026         environ = self.format_environment(env)
1027
1028         # Add environ to command
1029         command = environ + command
1030
1031         return self.upload(command, shfile, text=True, overwrite=overwrite)
1032
1033     def format_environment(self, env, inline=False):
1034         """ Formats the environment variables for a command to be executed
1035         either as an inline command
1036         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
1037         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1038         """
1039         if not env: return ""
1040
1041         # Remove extra white spaces
1042         env = re.sub(r'\s+', ' ', env.strip())
1043
1044         sep = ";" if inline else "\n"
1045         return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep 
1046
1047     def check_errors(self, home, 
1048                      ecodefile = "exitcode", 
1049                      stderr = "stderr"):
1050         """ Checks whether errors occurred while running a command.
1051         It first checks the exit code for the command, and only if the
1052         exit code is an error one it returns the error output.
1053
1054         """
1055         proc = None
1056         err = ""
1057
1058         # get exit code saved in the 'exitcode' file
1059         ecode = self.exitcode(home, ecodefile)
1060
1061         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1062             err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1063         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1064             # The process returned an error code or didn't exist. 
1065             # Check standard error.
1066             (err, eerr), proc = self.check_output(home, stderr)
1067
1068             # If the stderr file was not found, assume nothing bad happened,
1069             # and just ignore the error.
1070             # (cat returns 1 for error "No such file or directory")
1071             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
1072                 err = "" 
1073                 
1074         return ("", err), proc
1075     
1076     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1077         """ Waits until the pid file for the command is generated, 
1078             and returns the pid and ppid of the process """
1079         pid = ppid = None
1080         delay = 1.0
1081
1082         for i in range(2):
1083             pidtuple = self.getpid(home = home, pidfile = pidfile)
1084             
1085             if pidtuple:
1086                 pid, ppid = pidtuple
1087                 break
1088             else:
1089                 time.sleep(delay)
1090                 delay = delay * 1.5
1091         else:
1092             msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1093             self.error(msg)
1094     
1095             if raise_on_error:
1096                 raise RuntimeError(msg)
1097
1098         return pid, ppid
1099
1100     def wait_run(self, pid, ppid, trial = 0):
1101         """ wait for a remote process to finish execution """
1102         delay = 1.0
1103
1104         while True:
1105             status = self.status(pid, ppid)
1106             
1107             if status is ProcStatus.FINISHED:
1108                 break
1109             elif status is not ProcStatus.RUNNING:
1110                 delay = delay * 1.5
1111                 time.sleep(delay)
1112                 # If it takes more than 20 seconds to start, then
1113                 # asume something went wrong
1114                 if delay > 20:
1115                     break
1116             else:
1117                 # The app is running, just wait...
1118                 time.sleep(0.5)
1119
1120     def check_output(self, home, filename):
1121         """ Retrives content of file """
1122         (out, err), proc = self.execute(
1123             "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1124         return (out, err), proc
1125
1126     def is_alive(self):
1127         """ Checks if host is responsive
1128         """
1129         if self.localhost:
1130             return True
1131
1132         out = err = ""
1133         msg = "Unresponsive host. Wrong answer. "
1134
1135         # The underlying SSH layer will sometimes return an empty
1136         # output (even if the command was executed without errors).
1137         # To work arround this, repeat the operation N times or
1138         # until the result is not empty string
1139         try:
1140             (out, err), proc = self.execute("echo 'ALIVE'",
1141                                             blocking = True,
1142                                             with_lock = True)
1143             
1144             if out.find("ALIVE") > -1:
1145                 return True
1146         except:
1147             trace = traceback.format_exc()
1148             msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1149
1150         self.error(msg, out, err)
1151         return False
1152
1153     def find_home(self):
1154         """ 
1155         Retrieves host home directory
1156         """
1157         # The underlying SSH layer will sometimes return an empty
1158         # output (even if the command was executed without errors).
1159         # To work arround this, repeat the operation N times or
1160         # until the result is not empty string
1161         msg = "Impossible to retrieve HOME directory"
1162         try:
1163             (out, err), proc = self.execute("echo ${HOME}",
1164                                             blocking = True,
1165                                             with_lock = True)
1166             
1167             if out.strip() != "":
1168                 self._home_dir =  out.strip()
1169         except:
1170             trace = traceback.format_exc()
1171             msg = "Impossible to retrieve HOME directory {}".format(trace)
1172
1173         if not self._home_dir:
1174             self.error(msg)
1175             raise RuntimeError(msg)
1176
1177     def filter_existing_files(self, src, dst):
1178         """ Removes files that already exist in the Linux host from src list
1179         """
1180         # construct a dictionary with { dst: src }
1181         dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1182                 if len(src) > 1 else {dst: src[0]}
1183
1184         command = []
1185         for d in dests:
1186             command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1187
1188         command = ";".join(command)
1189
1190         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1191         
1192         # avoid RuntimeError that would result from
1193         # changing loop subject during iteration
1194         keys = list(dests.keys())
1195         for d in keys:
1196             if out.find(d) > -1:
1197                 del dests[d]
1198
1199         if not dests:
1200             return []
1201
1202         retcod = dests.values()
1203         if PY3: retcod = list(retcod)
1204         return retcod