node.upload knows how to optionnally create executable files
[nepi.git] / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy, 
21                                      ResourceState)
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import stat
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 from six import PY3
37
38 # TODO: Unify delays!!
39 # TODO: Validate outcome of uploads!! 
40
41 class ExitCode:
42     """
43     Error codes that the rexitcode function can return if unable to
44     check the exit code of a spawned process
45     """
46     FILENOTFOUND = -1
47     CORRUPTFILE = -2
48     ERROR = -3
49     OK = 0
50
51 class OSType:
52     """
53     Supported flavors of Linux OS
54     """
55     DEBIAN = 1 
56     UBUNTU = 1 << 1 
57     FEDORA = 1 << 2
58     FEDORA_8 = 1 << 3 | FEDORA 
59     FEDORA_12 = 1 << 4 | FEDORA 
60     FEDORA_14 = 1 << 5 | FEDORA 
61
62 @clsinit_copy
63 class LinuxNode(ResourceManager):
64     """
65     .. class:: Class Args :
66       
67         :param ec: The Experiment controller
68         :type ec: ExperimentController
69         :param guid: guid of the RM
70         :type guid: int
71
72     .. note::
73
74         There are different ways in which commands can be executed using the
75         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
76         'run_and_wait'). 
77         
78         Brief explanation:
79
80             * 'execute' (blocking mode) :  
81
82                      HOW IT WORKS: 'execute', forks a process and run the
83                      command, synchronously, attached to the terminal, in
84                      foreground.
85                      The execute method will block until the command returns
86                      the result on 'out', 'err' (so until it finishes executing).
87   
88                      USAGE: short-lived commands that must be executed attached
89                      to a terminal and in foreground, for which it IS necessary
90                      to block until the command has finished (e.g. if you want
91                      to run 'ls' or 'cat').
92
93             * 'execute' (NON blocking mode - blocking = False) :
94
95                     HOW IT WORKS: Same as before, except that execute method
96                     will return immediately (even if command still running).
97
98                     USAGE: long-lived commands that must be executed attached
99                     to a terminal and in foreground, but for which it is not
100                     necessary to block until the command has finished. (e.g.
101                     start an application using X11 forwarding)
102
103              * 'run' :
104
105                    HOW IT WORKS: Connects to the host ( using SSH if remote)
106                    and launches the command in background, detached from any
107                    terminal (daemonized), and returns. The command continues to
108                    run remotely, but since it is detached from the terminal,
109                    its pipes (stdin, stdout, stderr) can't be redirected to the
110                    console (as normal non detached processes would), and so they
111                    are explicitly redirected to files. The pidfile is created as
112                    part of the process of launching the command. The pidfile
113                    holds the pid and ppid of the process forked in background,
114                    so later on it is possible to check whether the command is still
115                    running.
116
117                     USAGE: long-lived commands that can run detached in background,
118                     for which it is NOT necessary to block (wait) until the command
119                     has finished. (e.g. start an application that is not using X11
120                     forwarding. It can run detached and remotely in background)
121
122              * 'run_and_wait' :
123
124                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
125                     the command has finished execution. It also checks whether
126                     errors occurred during runtime by reading the exitcode file,
127                     which contains the exit code of the command that was run
128                     (checking stderr only is not always reliable since many
129                     commands throw debugging info to stderr and the only way to
130                     automatically know whether an error really happened is to
131                     check the process exit code).
132
133                     Another difference with respect to 'run', is that instead
134                     of directly executing the command as a bash command line,
135                     it uploads the command to a bash script and runs the script.
136                     This allows to use the bash script to debug errors, since
137                     it remains at the remote host and can be run manually to
138                     reproduce the error.
139                   
140                     USAGE: medium-lived commands that can run detached in
141                     background, for which it IS necessary to block (wait) until
142                     the command has finished. (e.g. Package installation,
143                     source compilation, file download, etc)
144
145     """
146     _rtype = "linux::Node"
147     _help = "Controls Linux host machines ( either localhost or a host " \
148             "that can be accessed using a SSH key)"
149     _platform = "linux"
150
151     @classmethod
152     def _register_attributes(cls):
153         cls._register_attribute(
154             Attribute("hostname",
155                       "Hostname of the machine",
156                       flags = Flags.Design))
157         cls._register_attribute(
158             Attribute("username",
159                       "Local account username", 
160                       flags = Flags.Credential))
161         cls._register_attribute(
162             Attribute("port",
163                       "SSH port",
164                       flags = Flags.Design))
165         cls._register_attribute(
166             Attribute("home",
167                       "Experiment home directory to store all experiment related files",
168                       flags = Flags.Design))
169         cls._register_attribute(
170             Attribute("identity",
171                       "SSH identity file",
172                       flags = Flags.Credential))
173         cls._register_attribute(
174             Attribute("serverKey",
175                       "Server public key", 
176                       flags = Flags.Design))
177         cls._register_attribute(
178             Attribute("cleanHome",
179                       "Remove all nepi files and directories "
180                       " from node home folder before starting experiment", 
181                       type = Types.Bool,
182                       default = False,
183             flags = Flags.Design))
184         cls._register_attribute(
185             Attribute("cleanExperiment",
186                       "Remove all files and directories " 
187                       " from a previous same experiment, before the new experiment starts", 
188                       type = Types.Bool,
189                       default = False,
190                       flags = Flags.Design))
191         cls._register_attribute(
192             Attribute("cleanProcesses",
193                       "Kill all running processes before starting experiment",
194                       type = Types.Bool,
195                       default = False,
196                       flags = Flags.Design))
197         cls._register_attribute(
198             Attribute("cleanProcessesAfter",
199                       "Kill all running processes after starting experiment"
200                       "NOTE: This might be dangerous when using user root",
201                       type = Types.Bool,
202                       default = True,
203                       flags = Flags.Design))
204         cls._register_attribute(
205             Attribute("tearDown",
206                       "Bash script to be executed before releasing the resource",
207                       flags = Flags.Design))
208         cls._register_attribute(
209             Attribute("gatewayUser",
210                       "Gateway account username",
211                       flags = Flags.Design))
212         cls._register_attribute(
213             Attribute("gateway",
214                       "Hostname of the gateway machine",
215                       flags = Flags.Design))
216         cls._register_attribute(
217             Attribute("ip",
218                       "Linux host public IP address. "
219                       "Must not be modified by the user unless hostname is 'localhost'",
220                       flags = Flags.Design))
221
222     def __init__(self, ec, guid):
223         super(LinuxNode, self).__init__(ec, guid)
224         self._os = None
225         # home directory at Linux host
226         self._home_dir = ""
227
228         # lock to prevent concurrent applications on the same node,
229         # to execute commands at the same time. There are potential
230         # concurrency issues when using SSH to a same host from 
231         # multiple threads. There are also possible operational 
232         # issues, e.g. an application querying the existence 
233         # of a file or folder prior to its creation, and another 
234         # application creating the same file or folder in between.
235         self._node_lock = threading.Lock()
236         
237     def log_message(self, msg):
238         return " guid {} - host {} - {} "\
239             .format(self.guid, self.get("hostname"), msg)
240
241     @property
242     def home_dir(self):
243         home = self.get("home") or ""
244         if not home.startswith("/"):
245             home = os.path.join(self._home_dir, home) 
246             return home
247
248     @property
249     def nepi_home(self):
250         return os.path.join(self.home_dir, ".nepi")
251
252     @property
253     def usr_dir(self):
254         return os.path.join(self.nepi_home, "nepi-usr")
255
256     @property
257     def lib_dir(self):
258         return os.path.join(self.usr_dir, "lib")
259
260     @property
261     def bin_dir(self):
262         return os.path.join(self.usr_dir, "bin")
263
264     @property
265     def src_dir(self):
266         return os.path.join(self.usr_dir, "src")
267
268     @property
269     def share_dir(self):
270         return os.path.join(self.usr_dir, "share")
271
272     @property
273     def exp_dir(self):
274         return os.path.join(self.nepi_home, "nepi-exp")
275
276     @property
277     def exp_home(self):
278         return os.path.join(self.exp_dir, self.ec.exp_id)
279
280     @property
281     def node_home(self):
282         return os.path.join(self.exp_home, "node-{}".format(self.guid))
283
284     @property
285     def run_home(self):
286         return os.path.join(self.node_home, self.ec.run_id)
287
288     @property
289     def os(self):
290         if self._os:
291             return self._os
292
293         if not self.localhost and not self.get("username"):
294             msg = "Can't resolve OS, insufficient data "
295             self.error(msg)
296             raise RuntimeError(msg)
297
298         out = self.get_os()
299
300         if out.find("Debian") == 0: 
301             self._os = OSType.DEBIAN
302         elif out.find("Ubuntu") ==0:
303             self._os = OSType.UBUNTU
304         elif out.find("Fedora release") == 0:
305             self._os = OSType.FEDORA
306             if out.find("Fedora release 8") == 0:
307                 self._os = OSType.FEDORA_8
308             elif out.find("Fedora release 12") == 0:
309                 self._os = OSType.FEDORA_12
310             elif out.find("Fedora release 14") == 0:
311                 self._os = OSType.FEDORA_14
312         else:
313             msg = "Unsupported OS"
314             self.error(msg, out)
315             raise RuntimeError("{} - {} ".format(msg, out))
316
317         return self._os
318
319     def get_os(self):
320         # The underlying SSH layer will sometimes return an empty
321         # output (even if the command was executed without errors).
322         # To work arround this, repeat the operation N times or
323         # until the result is not empty string
324         out = ""
325         try:
326             (out, err), proc = self.execute("cat /etc/issue", 
327                                             with_lock = True,
328                                             blocking = True)
329         except:
330             trace = traceback.format_exc()
331             msg = "Error detecting OS: {} ".format(trace)
332             self.error(msg, out, err)
333     
334         return out
335
336     @property
337     def use_deb(self):
338         return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
339
340     @property
341     def use_rpm(self):
342         return (self.os & OSType.FEDORA)
343
344     @property
345     def localhost(self):
346         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
347
348     def do_provision(self):
349         # check if host is alive
350         if not self.is_alive():
351             trace = traceback.format_exc()
352             msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
353             self.error(msg)
354             raise RuntimeError(msg)
355
356         self.find_home()
357
358         if self.get("cleanProcesses"):
359             self.clean_processes()
360
361         if self.get("cleanHome"):
362             self.clean_home()
363             
364         if self.get("cleanExperiment"):
365             self.clean_experiment()
366             
367         # Create shared directory structure and node home directory
368         paths = [self.lib_dir, 
369                  self.bin_dir, 
370                  self.src_dir, 
371                  self.share_dir, 
372                  self.node_home]
373
374         self.mkdir(paths)
375
376         # Get Public IP address if possible
377         if not self.get("ip"):
378             try:
379                 ip = sshfuncs.gethostbyname(self.get("hostname"))
380                 self.set("ip", ip)
381             except:
382                 if self.get("gateway") is None:
383                     msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
384                     self.error(msg)
385
386         super(LinuxNode, self).do_provision()
387
388     def do_deploy(self):
389         if self.state == ResourceState.NEW:
390             self.info("Deploying node")
391             self.do_discover()
392             self.do_provision()
393
394         # Node needs to wait until all associated interfaces are 
395         # ready before it can finalize deployment
396         from nepi.resources.linux.interface import LinuxInterface
397         ifaces = self.get_connected(LinuxInterface.get_rtype())
398         for iface in ifaces:
399             if iface.state < ResourceState.READY:
400                 self.ec.schedule(self.reschedule_delay, self.deploy)
401                 return 
402
403         super(LinuxNode, self).do_deploy()
404
405     def do_release(self):
406         rms = self.get_connected()
407         for rm in rms:
408             # Node needs to wait until all associated RMs are released
409             # before it can be released
410             if rm.state != ResourceState.RELEASED:
411                 self.ec.schedule(self.reschedule_delay, self.release)
412                 return 
413
414         tear_down = self.get("tearDown")
415         if tear_down:
416             self.execute(tear_down)
417
418         if self.get("cleanProcessesAfter"):
419             self.clean_processes()
420
421         super(LinuxNode, self).do_release()
422
423     def valid_connection(self, guid):
424         # TODO: Validate!
425         return True
426
427     def clean_processes(self):
428         self.info("Cleaning up processes")
429
430         if self.localhost:
431             return 
432         
433         if self.get("username") != 'root':
434             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
435                    "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
436                    "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
437         else:
438             if self.state >= ResourceState.READY:
439                 ########################
440                 #Collect all process (must change for a more intelligent way)
441                 ppid = []
442                 pids = []
443                 avoid_pids = "ps axjf | awk '{print $1,$2}'"
444                 (out, err), proc = self.execute(avoid_pids)
445                 if len(out) != 0:
446                     for line in out.strip().split("\n"):
447                         parts = line.strip().split(" ")
448                         ppid.append(parts[0])
449                         pids.append(parts[1])
450
451                 #Collect all process below ssh -D
452                 tree_owner = 0
453                 ssh_pids = []
454                 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
455                 (out, err), proc = self.execute(sshs)
456                 if len(out) != 0:
457                     for line in out.strip().split("\n"):
458                         parts = line.strip().split(" ")
459                         if parts[1].startswith('root@pts'):
460                             ssh_pids.append(parts[0])
461                         elif parts[1] == "-D":
462                             tree_owner = parts[0]
463
464                 avoid_kill = []
465                 temp = []
466                 #Search for the child process of the pid's collected at the first block.
467                 for process in ssh_pids:
468                     temp = self.search_for_child(process, pids, ppid)
469                     avoid_kill = list(set(temp))
470                 
471                 if len(avoid_kill) > 0:
472                     avoid_kill.append(tree_owner) 
473                 ########################
474
475                 import pickle
476                 with open("/tmp/save.proc", "rb") as pickle_file:
477                     pids = pickle.load(pickle_file)
478                 pids_temp = dict()
479                 ps_aux = "ps aux | awk '{print $2,$11}'"
480                 (out, err), proc = self.execute(ps_aux)
481                 if len(out) != 0:
482                     for line in out.strip().split("\n"):
483                         parts = line.strip().split(" ")
484                         pids_temp[parts[0]] = parts[1]
485                     # creates the difference between the machine pids freezed (pickle) and the actual
486                     # adding the avoided pids filtered above (avoid_kill) to allow users keep process
487                     # alive when using besides ssh connections  
488                     kill_pids = set(pids_temp.items()) - set(pids.items())
489                     # py2/py3 : keep it simple
490                     kill_pids = ' '.join(kill_pids)
491
492                     # removing pids from beside connections and its process
493                     kill_pids = kill_pids.split(' ')
494                     kill_pids = list(set(kill_pids) - set(avoid_kill))
495                     kill_pids = ' '.join(kill_pids)
496
497                     cmd = ("killall tcpdump || /bin/true ; " +
498                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
499                            "kill {} || /bin/true ; ".format(kill_pids))
500                 else:
501                     cmd = ("killall tcpdump || /bin/true ; " +
502                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
503             else:
504                 cmd = ("killall tcpdump || /bin/true ; " +
505                        "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
506
507         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
508
509     def search_for_child(self, pid, pids, ppid, family=[]):
510         """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
511         """
512         family.append(pid)
513         for key, value in enumerate(ppid):
514             if value == pid:
515                 child = pids[key]
516                 self.search_for_child(child, pids, ppid)
517         return family
518         
519     def clean_home(self):
520         """ Cleans all NEPI related folders in the Linux host
521         """
522         self.info("Cleaning up home")
523         
524         cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
525               .format(self.home_dir)
526
527         return self.execute(cmd, with_lock = True)
528
529     def clean_experiment(self):
530         """ Cleans all experiment related files in the Linux host.
531         It preserves NEPI files and folders that have a multi experiment
532         scope.
533         """
534         self.info("Cleaning up experiment files")
535         
536         cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
537               .format(self.exp_dir, self.ec.exp_id)
538         
539         return self.execute(cmd, with_lock = True)
540
541     def execute(self, command,
542                 sudo = False,
543                 env = None,
544                 tty = False,
545                 forward_x11 = False,
546                 retry = 3,
547                 connect_timeout = 30,
548                 strict_host_checking = False,
549                 persistent = True,
550                 blocking = True,
551                 with_lock = False
552     ):
553         """ Notice that this invocation will block until the
554         execution finishes. If this is not the desired behavior,
555         use 'run' instead."""
556
557         if self.localhost:
558             (out, err), proc = execfuncs.lexec(
559                 command, 
560                 user = self.get("username"), # still problem with localhost
561                 sudo = sudo,
562                 env = env)
563         else:
564             if with_lock:
565                 # If the execute command is blocking, we don't want to keep
566                 # the node lock. This lock is used to avoid race conditions
567                 # when creating the ControlMaster sockets. A more elegant
568                 # solution is needed.
569                 with self._node_lock:
570                     (out, err), proc = sshfuncs.rexec(
571                         command, 
572                         host = self.get("hostname"),
573                         user = self.get("username"),
574                         port = self.get("port"),
575                         gwuser = self.get("gatewayUser"),
576                         gw = self.get("gateway"),
577                         agent = True,
578                         sudo = sudo,
579                         identity = self.get("identity"),
580                         server_key = self.get("serverKey"),
581                         env = env,
582                         tty = tty,
583                         forward_x11 = forward_x11,
584                         retry = retry,
585                         connect_timeout = connect_timeout,
586                         persistent = persistent,
587                         blocking = blocking, 
588                         strict_host_checking = strict_host_checking
589                     )
590             else:
591                 (out, err), proc = sshfuncs.rexec(
592                     command, 
593                     host = self.get("hostname"),
594                     user = self.get("username"),
595                     port = self.get("port"),
596                     gwuser = self.get("gatewayUser"),
597                     gw = self.get("gateway"),
598                     agent = True,
599                     sudo = sudo,
600                     identity = self.get("identity"),
601                     server_key = self.get("serverKey"),
602                     env = env,
603                     tty = tty,
604                     forward_x11 = forward_x11,
605                     retry = retry,
606                     connect_timeout = connect_timeout,
607                     persistent = persistent,
608                     blocking = blocking, 
609                     strict_host_checking = strict_host_checking
610                 )
611
612         return (out, err), proc
613
614     def run(self, command, home,
615             create_home = False,
616             pidfile = 'pidfile',
617             stdin = None, 
618             stdout = 'stdout', 
619             stderr = 'stderr', 
620             sudo = False,
621             tty = False,
622             strict_host_checking = False):
623         
624         self.debug("Running command '{}'".format(command))
625         
626         if self.localhost:
627             (out, err), proc = execfuncs.lspawn(
628                 command, pidfile,
629                 home = home, 
630                 create_home = create_home, 
631                 stdin = stdin or '/dev/null',
632                 stdout = stdout or '/dev/null',
633                 stderr = stderr or '/dev/null',
634                 sudo = sudo) 
635         else:
636             with self._node_lock:
637                 (out, err), proc = sshfuncs.rspawn(
638                     command,
639                     pidfile = pidfile,
640                     home = home,
641                     create_home = create_home,
642                     stdin = stdin or '/dev/null',
643                     stdout = stdout or '/dev/null',
644                     stderr = stderr or '/dev/null',
645                     sudo = sudo,
646                     host = self.get("hostname"),
647                     user = self.get("username"),
648                     port = self.get("port"),
649                     gwuser = self.get("gatewayUser"),
650                     gw = self.get("gateway"),
651                     agent = True,
652                     identity = self.get("identity"),
653                     server_key = self.get("serverKey"),
654                     tty = tty,
655                     strict_host_checking = strict_host_checking
656                 )
657
658         return (out, err), proc
659
660     def getpid(self, home, pidfile = "pidfile"):
661         if self.localhost:
662             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
663         else:
664             with self._node_lock:
665                 pidtuple = sshfuncs.rgetpid(
666                     os.path.join(home, pidfile),
667                     host = self.get("hostname"),
668                     user = self.get("username"),
669                     port = self.get("port"),
670                     gwuser = self.get("gatewayUser"),
671                     gw = self.get("gateway"),
672                     agent = True,
673                     identity = self.get("identity"),
674                     server_key = self.get("serverKey"),
675                     strict_host_checking = False
676                 )
677         
678         return pidtuple
679
680     def status(self, pid, ppid):
681         if self.localhost:
682             status = execfuncs.lstatus(pid, ppid)
683         else:
684             with self._node_lock:
685                 status = sshfuncs.rstatus(
686                     pid, ppid,
687                     host = self.get("hostname"),
688                     user = self.get("username"),
689                     port = self.get("port"),
690                     gwuser = self.get("gatewayUser"),
691                     gw = self.get("gateway"),
692                     agent = True,
693                     identity = self.get("identity"),
694                     server_key = self.get("serverKey"),
695                     strict_host_checking = False
696                 )
697            
698         return status
699     
700     def kill(self, pid, ppid, sudo = False):
701         out = err = ""
702         proc = None
703         status = self.status(pid, ppid)
704
705         if status == sshfuncs.ProcStatus.RUNNING:
706             if self.localhost:
707                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
708             else:
709                 with self._node_lock:
710                     (out, err), proc = sshfuncs.rkill(
711                         pid, ppid,
712                         host = self.get("hostname"),
713                         user = self.get("username"),
714                         port = self.get("port"),
715                         gwuser = self.get("gatewayUser"),
716                         gw = self.get("gateway"),
717                         agent = True,
718                         sudo = sudo,
719                         identity = self.get("identity"),
720                         server_key = self.get("serverKey"),
721                         strict_host_checking = False
722                     )
723
724         return (out, err), proc
725
726     def copy(self, src, dst):
727         if self.localhost:
728             (out, err), proc = execfuncs.lcopy(
729                 src, dst, 
730                 recursive = True)
731         else:
732             with self._node_lock:
733                 (out, err), proc = sshfuncs.rcopy(
734                     src, dst, 
735                     port = self.get("port"),
736                     gwuser = self.get("gatewayUser"),
737                     gw = self.get("gateway"),
738                     identity = self.get("identity"),
739                     server_key = self.get("serverKey"),
740                     recursive = True,
741                     strict_host_checking = False)
742
743         return (out, err), proc
744
745     def upload(self, src, dst, text = False, overwrite = True,
746                raise_on_error = True, executable = False):
747         """ Copy content to destination
748
749         src  string with the content to copy. Can be:
750             - plain text
751             - a string with the path to a local file
752             - a string with a semi-colon separeted list of local files
753             - a string with a local directory
754
755         dst  string with destination path on the remote host (remote is 
756             always self.host)
757
758         when src is text input, it gets stored into a temp file before 
759         uploading; in this case, and if executable is True, said temp file
760         is made executable, and thus uploaded file will be too
761         """
762         # If source is a string input 
763         f = None
764         if text and not os.path.isfile(src):
765             # src is text input that should be uploaded as file
766             # create a temporal file with the content to upload
767             # in python3 we need to open in binary mode if str is bytes
768             mode = 'w' if isinstance(src, str) else 'wb'
769             f = tempfile.NamedTemporaryFile(mode=mode, delete=False)
770             f.write(src)
771             f.close()
772             if executable:
773                 # do something like chmod u+x
774                 mode = os.stat(f.name).st_mode
775                 mode |= stat.S_IXUSR
776                 os.chmod(f.name, mode)
777                 
778             src = f.name
779
780         # If dst files should not be overwritten, check that the files do not
781         # exist already
782         if isinstance(src, str):
783             src = [s.strip() for s in src.split(";")]
784     
785         if overwrite == False:
786             src = self.filter_existing_files(src, dst)
787             if not src:
788                 return ("", ""), None
789
790         if not self.localhost:
791             # Build destination as <user>@<server>:<path>
792             dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
793
794         ((out, err), proc) = self.copy(src, dst)
795
796         # clean up temp file
797         if f:
798             os.remove(f.name)
799
800         if err:
801             msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
802             self.error(msg, out, err)
803             
804             msg = "{} out: {} err: {}".format(msg, out, err)
805             if raise_on_error:
806                 raise RuntimeError(msg)
807
808         return ((out, err), proc)
809
810     def download(self, src, dst, raise_on_error = True):
811         if not self.localhost:
812             # Build destination as <user>@<server>:<path>
813             src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
814
815         ((out, err), proc) = self.copy(src, dst)
816
817         if err:
818             msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst) 
819             self.error(msg, out, err)
820
821             if raise_on_error:
822                 raise RuntimeError(msg)
823
824         return ((out, err), proc)
825
826     def install_packages_command(self, packages):
827         command = ""
828         if self.use_rpm:
829             command = rpmfuncs.install_packages_command(self.os, packages)
830         elif self.use_deb:
831             command = debfuncs.install_packages_command(self.os, packages)
832         else:
833             msg = "Error installing packages ( OS not known ) "
834             self.error(msg, self.os)
835             raise RuntimeError(msg)
836
837         return command
838
839     def install_packages(self, packages, home,
840                          run_home = None,
841                          raise_on_error = True):
842         """ Install packages in the Linux host.
843
844         'home' is the directory to upload the package installation script.
845         'run_home' is the directory from where to execute the script.
846         """
847         command = self.install_packages_command(packages)
848
849         run_home = run_home or home
850
851         (out, err), proc = self.run_and_wait(command, run_home, 
852                                              shfile = os.path.join(home, "instpkg.sh"),
853                                              pidfile = "instpkg_pidfile",
854                                              ecodefile = "instpkg_exitcode",
855                                              stdout = "instpkg_stdout", 
856                                              stderr = "instpkg_stderr",
857                                              overwrite = False,
858                                              raise_on_error = raise_on_error)
859
860         return (out, err), proc 
861
862     def remove_packages(self, packages, home, run_home = None,
863                         raise_on_error = True):
864         """ Uninstall packages from the Linux host.
865
866         'home' is the directory to upload the package un-installation script.
867         'run_home' is the directory from where to execute the script.
868         """
869         if self.use_rpm:
870             command = rpmfuncs.remove_packages_command(self.os, packages)
871         elif self.use_deb:
872             command = debfuncs.remove_packages_command(self.os, packages)
873         else:
874             msg = "Error removing packages ( OS not known ) "
875             self.error(msg)
876             raise RuntimeError(msg)
877
878         run_home = run_home or home
879
880         (out, err), proc = self.run_and_wait(command, run_home, 
881                                              shfile = os.path.join(home, "rmpkg.sh"),
882                                              pidfile = "rmpkg_pidfile",
883                                              ecodefile = "rmpkg_exitcode",
884                                              stdout = "rmpkg_stdout", 
885                                              stderr = "rmpkg_stderr",
886                                              overwrite = False,
887                                              raise_on_error = raise_on_error)
888         
889         return (out, err), proc 
890
891     def mkdir(self, paths, clean = False):
892         """ Paths is either a single remote directory path to create,
893         or a list of directories to create.
894         """
895         if clean:
896             self.rmdir(paths)
897
898         if isinstance(paths, str):
899             paths = [paths]
900
901         cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
902
903         return self.execute(cmd, with_lock = True)
904
905     def rmdir(self, paths):
906         """ Paths is either a single remote directory path to delete,
907         or a list of directories to delete.
908         """
909
910         if isinstance(paths, str):
911             paths = [paths]
912
913         cmd = " ; ".join(["rm -rf {}".format(path) for path in paths])
914
915         return self.execute(cmd, with_lock = True)
916     
917     def run_and_wait(self, command, home, 
918                      shfile="cmd.sh",
919                      env=None,
920                      overwrite=True,
921                      wait_run=True,
922                      pidfile="pidfile", 
923                      ecodefile="exitcode", 
924                      stdin=None, 
925                      stdout="stdout", 
926                      stderr="stderr", 
927                      sudo=False,
928                      tty=False,
929                      raise_on_error=True):
930         """
931         Uploads the 'command' to a bash script in the host.
932         Then runs the script detached in background in the host, and
933         busy-waites until the script finishes executing.
934         """
935
936         if not shfile.startswith("/"):
937             shfile = os.path.join(home, shfile)
938
939         self.upload_command(command, 
940                             shfile = shfile, 
941                             ecodefile = ecodefile, 
942                             env = env,
943                             overwrite = overwrite)
944
945         command = "bash {}".format(shfile)
946         # run command in background in remote host
947         (out, err), proc = self.run(command, home, 
948                                     pidfile = pidfile,
949                                     stdin = stdin, 
950                                     stdout = stdout, 
951                                     stderr = stderr, 
952                                     sudo = sudo,
953                                     tty = tty)
954
955         # check no errors occurred
956         if proc.poll():
957             msg = " Failed to run command '{}' ".format(command)
958             self.error(msg, out, err)
959             if raise_on_error:
960                 raise RuntimeError(msg)
961
962         # Wait for pid file to be generated
963         pid, ppid = self.wait_pid(
964             home = home, 
965             pidfile = pidfile, 
966             raise_on_error = raise_on_error)
967
968         if wait_run:
969             # wait until command finishes to execute
970             self.wait_run(pid, ppid)
971             
972             (eout, err), proc = self.check_errors(home,
973                                                   ecodefile = ecodefile,
974                                                   stderr = stderr)
975
976             # Out is what was written in the stderr file
977             if err:
978                 msg = " Failed to run command '{}' ".format(command)
979                 self.error(msg, eout, err)
980
981                 if raise_on_error:
982                     raise RuntimeError(msg)
983
984         (out, oerr), proc = self.check_output(home, stdout)
985         
986         return (out, err), proc
987         
988     def exitcode(self, home, ecodefile = "exitcode"):
989         """
990         Get the exit code of an application.
991         Returns an integer value with the exit code 
992         """
993         (out, err), proc = self.check_output(home, ecodefile)
994
995         # Succeeded to open file, return exit code in the file
996         if proc.wait() == 0:
997             try:
998                 return int(out.strip())
999             except:
1000                 # Error in the content of the file!
1001                 return ExitCode.CORRUPTFILE
1002
1003         # No such file or directory
1004         if proc.returncode == 1:
1005             return ExitCode.FILENOTFOUND
1006         
1007         # Other error from 'cat'
1008         return ExitCode.ERROR
1009
1010     def upload_command(self, command, 
1011                        shfile="cmd.sh",
1012                        ecodefile="exitcode",
1013                        overwrite=True,
1014                        env=None):
1015         """ Saves the command as a bash script file in the remote host, and
1016         forces to save the exit code of the command execution to the ecodefile
1017         """
1018
1019         if not (command.strip().endswith(";") or command.strip().endswith("&")):
1020             command += ";"
1021             
1022         # The exit code of the command will be stored in ecodefile
1023         command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1024                   .format(command=command, ecodefile=ecodefile)
1025
1026         # Export environment
1027         environ = self.format_environment(env)
1028
1029         # Add environ to command
1030         command = environ + command
1031
1032         return self.upload(command, shfile, text=True, overwrite=overwrite)
1033
1034     def format_environment(self, env, inline=False):
1035         """ Formats the environment variables for a command to be executed
1036         either as an inline command
1037         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
1038         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1039         """
1040         if not env: return ""
1041
1042         # Remove extra white spaces
1043         env = re.sub(r'\s+', ' ', env.strip())
1044
1045         sep = ";" if inline else "\n"
1046         return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep 
1047
1048     def check_errors(self, home, 
1049                      ecodefile = "exitcode", 
1050                      stderr = "stderr"):
1051         """ Checks whether errors occurred while running a command.
1052         It first checks the exit code for the command, and only if the
1053         exit code is an error one it returns the error output.
1054
1055         """
1056         proc = None
1057         err = ""
1058
1059         # get exit code saved in the 'exitcode' file
1060         ecode = self.exitcode(home, ecodefile)
1061
1062         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1063             err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1064         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1065             # The process returned an error code or didn't exist. 
1066             # Check standard error.
1067             (err, eerr), proc = self.check_output(home, stderr)
1068
1069             # If the stderr file was not found, assume nothing bad happened,
1070             # and just ignore the error.
1071             # (cat returns 1 for error "No such file or directory")
1072             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
1073                 err = "" 
1074                 
1075         return ("", err), proc
1076     
1077     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1078         """ Waits until the pid file for the command is generated, 
1079             and returns the pid and ppid of the process """
1080         pid = ppid = None
1081         delay = 1.0
1082
1083         for i in range(2):
1084             pidtuple = self.getpid(home = home, pidfile = pidfile)
1085             
1086             if pidtuple:
1087                 pid, ppid = pidtuple
1088                 break
1089             else:
1090                 time.sleep(delay)
1091                 delay = delay * 1.5
1092         else:
1093             msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1094             self.error(msg)
1095     
1096             if raise_on_error:
1097                 raise RuntimeError(msg)
1098
1099         return pid, ppid
1100
1101     def wait_run(self, pid, ppid, trial = 0):
1102         """ wait for a remote process to finish execution """
1103         delay = 1.0
1104
1105         while True:
1106             status = self.status(pid, ppid)
1107             
1108             if status is ProcStatus.FINISHED:
1109                 break
1110             elif status is not ProcStatus.RUNNING:
1111                 delay = delay * 1.5
1112                 time.sleep(delay)
1113                 # If it takes more than 20 seconds to start, then
1114                 # asume something went wrong
1115                 if delay > 20:
1116                     break
1117             else:
1118                 # The app is running, just wait...
1119                 time.sleep(0.5)
1120
1121     def check_output(self, home, filename):
1122         """ Retrives content of file """
1123         (out, err), proc = self.execute(
1124             "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1125         return (out, err), proc
1126
1127     def is_alive(self):
1128         """ Checks if host is responsive
1129         """
1130         if self.localhost:
1131             return True
1132
1133         out = err = ""
1134         msg = "Unresponsive host. Wrong answer. "
1135
1136         # The underlying SSH layer will sometimes return an empty
1137         # output (even if the command was executed without errors).
1138         # To work arround this, repeat the operation N times or
1139         # until the result is not empty string
1140         try:
1141             (out, err), proc = self.execute("echo 'ALIVE'",
1142                                             blocking = True,
1143                                             with_lock = True)
1144             
1145             if out.find("ALIVE") > -1:
1146                 return True
1147         except:
1148             trace = traceback.format_exc()
1149             msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1150
1151         self.error(msg, out, err)
1152         return False
1153
1154     def find_home(self):
1155         """ 
1156         Retrieves host home directory
1157         """
1158         # The underlying SSH layer will sometimes return an empty
1159         # output (even if the command was executed without errors).
1160         # To work arround this, repeat the operation N times or
1161         # until the result is not empty string
1162         msg = "Impossible to retrieve HOME directory"
1163         try:
1164             (out, err), proc = self.execute("echo ${HOME}",
1165                                             blocking = True,
1166                                             with_lock = True)
1167             
1168             if out.strip() != "":
1169                 self._home_dir =  out.strip()
1170         except:
1171             trace = traceback.format_exc()
1172             msg = "Impossible to retrieve HOME directory {}".format(trace)
1173
1174         if not self._home_dir:
1175             self.error(msg)
1176             raise RuntimeError(msg)
1177
1178     def filter_existing_files(self, src, dst):
1179         """ Removes files that already exist in the Linux host from src list
1180         """
1181         # construct a dictionary with { dst: src }
1182         dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1183                 if len(src) > 1 else {dst: src[0]}
1184
1185         command = []
1186         for d in dests:
1187             command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1188
1189         command = ";".join(command)
1190
1191         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1192         
1193         # avoid RuntimeError that would result from
1194         # changing loop subject during iteration
1195         keys = list(dests.keys())
1196         for d in keys:
1197             if out.find(d) > -1:
1198                 del dests[d]
1199
1200         if not dests:
1201             return []
1202
1203         retcod = dests.values()
1204         if PY3: retcod = list(retcod)
1205         return retcod