linux/{node,application}
[nepi.git] / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy, 
21                                      ResourceState)
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import stat
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 from six import PY3
37
38 # TODO: Unify delays!!
39 # TODO: Validate outcome of uploads!! 
40
41 class ExitCode:
42     """
43     Error codes that the rexitcode function can return if unable to
44     check the exit code of a spawned process
45     """
46     FILENOTFOUND = -1
47     CORRUPTFILE = -2
48     ERROR = -3
49     OK = 0
50
51 class OSType:
52     """
53     Supported flavors of Linux OS
54     """
55     DEBIAN = 1 
56     UBUNTU = 1 << 1 
57     FEDORA = 1 << 2
58     FEDORA_8 = 1 << 3 | FEDORA 
59     FEDORA_12 = 1 << 4 | FEDORA 
60     FEDORA_14 = 1 << 5 | FEDORA 
61
62 @clsinit_copy
63 class LinuxNode(ResourceManager):
64     """
65     .. class:: Class Args :
66       
67         :param ec: The Experiment controller
68         :type ec: ExperimentController
69         :param guid: guid of the RM
70         :type guid: int
71
72     .. note::
73
74         There are different ways in which commands can be executed using the
75         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
76         'run_and_wait'). 
77         
78         Brief explanation:
79
80             * 'execute' (blocking mode) :  
81
82                      HOW IT WORKS: 'execute', forks a process and run the
83                      command, synchronously, attached to the terminal, in
84                      foreground.
85                      The execute method will block until the command returns
86                      the result on 'out', 'err' (so until it finishes executing).
87   
88                      USAGE: short-lived commands that must be executed attached
89                      to a terminal and in foreground, for which it IS necessary
90                      to block until the command has finished (e.g. if you want
91                      to run 'ls' or 'cat').
92
93             * 'execute' (NON blocking mode - blocking = False) :
94
95                     HOW IT WORKS: Same as before, except that execute method
96                     will return immediately (even if command still running).
97
98                     USAGE: long-lived commands that must be executed attached
99                     to a terminal and in foreground, but for which it is not
100                     necessary to block until the command has finished. (e.g.
101                     start an application using X11 forwarding)
102
103              * 'run' :
104
105                    HOW IT WORKS: Connects to the host ( using SSH if remote)
106                    and launches the command in background, detached from any
107                    terminal (daemonized), and returns. The command continues to
108                    run remotely, but since it is detached from the terminal,
109                    its pipes (stdin, stdout, stderr) can't be redirected to the
110                    console (as normal non detached processes would), and so they
111                    are explicitly redirected to files. The pidfile is created as
112                    part of the process of launching the command. The pidfile
113                    holds the pid and ppid of the process forked in background,
114                    so later on it is possible to check whether the command is still
115                    running.
116
117                     USAGE: long-lived commands that can run detached in background,
118                     for which it is NOT necessary to block (wait) until the command
119                     has finished. (e.g. start an application that is not using X11
120                     forwarding. It can run detached and remotely in background)
121
122              * 'run_and_wait' :
123
124                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
125                     the command has finished execution. It also checks whether
126                     errors occurred during runtime by reading the exitcode file,
127                     which contains the exit code of the command that was run
128                     (checking stderr only is not always reliable since many
129                     commands throw debugging info to stderr and the only way to
130                     automatically know whether an error really happened is to
131                     check the process exit code).
132
133                     Another difference with respect to 'run', is that instead
134                     of directly executing the command as a bash command line,
135                     it uploads the command to a bash script and runs the script.
136                     This allows to use the bash script to debug errors, since
137                     it remains at the remote host and can be run manually to
138                     reproduce the error.
139                   
140                     USAGE: medium-lived commands that can run detached in
141                     background, for which it IS necessary to block (wait) until
142                     the command has finished. (e.g. Package installation,
143                     source compilation, file download, etc)
144
145     """
146     _rtype = "linux::Node"
147     _help = "Controls Linux host machines ( either localhost or a host " \
148             "that can be accessed using a SSH key)"
149     _platform = "linux"
150
151     @classmethod
152     def _register_attributes(cls):
153         cls._register_attribute(
154             Attribute("hostname",
155                       "Hostname of the machine",
156                       flags = Flags.Design))
157         cls._register_attribute(
158             Attribute("username",
159                       "Local account username", 
160                       flags = Flags.Credential))
161         cls._register_attribute(
162             Attribute("port",
163                       "SSH port",
164                       flags = Flags.Design))
165         cls._register_attribute(
166             Attribute("home",
167                       "Experiment home directory to store all experiment related files",
168                       flags = Flags.Design))
169         cls._register_attribute(
170             Attribute("identity",
171                       "SSH identity file",
172                       flags = Flags.Credential))
173         cls._register_attribute(
174             Attribute("serverKey",
175                       "Server public key", 
176                       flags = Flags.Design))
177         cls._register_attribute(
178             Attribute("cleanHome",
179                       "Remove all nepi files and directories "
180                       " from node home folder before starting experiment", 
181                       type = Types.Bool,
182                       default = False,
183             flags = Flags.Design))
184         cls._register_attribute(
185             Attribute("cleanExperiment",
186                       "Remove all files and directories " 
187                       " from a previous same experiment, before the new experiment starts", 
188                       type = Types.Bool,
189                       default = False,
190                       flags = Flags.Design))
191         cls._register_attribute(
192             Attribute("cleanProcesses",
193                       "Kill all running processes before starting experiment",
194                       type = Types.Bool,
195                       default = False,
196                       flags = Flags.Design))
197         cls._register_attribute(
198             Attribute("cleanProcessesAfter",
199                       "Kill all running processes after starting experiment"
200                       "NOTE: This might be dangerous when using user root",
201                       type = Types.Bool,
202                       default = True,
203                       flags = Flags.Design))
204         cls._register_attribute(
205             Attribute("tearDown",
206                       "Bash script to be executed before releasing the resource",
207                       flags = Flags.Design))
208         cls._register_attribute(
209             Attribute("gatewayUser",
210                       "Gateway account username",
211                       flags = Flags.Design))
212         cls._register_attribute(
213             Attribute("gateway",
214                       "Hostname of the gateway machine",
215                       flags = Flags.Design))
216         cls._register_attribute(
217             Attribute("ip",
218                       "Linux host public IP address. "
219                       "Must not be modified by the user unless hostname is 'localhost'",
220                       flags = Flags.Design))
221
222     def __init__(self, ec, guid):
223         super(LinuxNode, self).__init__(ec, guid)
224         self._os = None
225         # home directory at Linux host
226         self._home_dir = ""
227
228         # lock to prevent concurrent applications on the same node,
229         # to execute commands at the same time. There are potential
230         # concurrency issues when using SSH to a same host from 
231         # multiple threads. There are also possible operational 
232         # issues, e.g. an application querying the existence 
233         # of a file or folder prior to its creation, and another 
234         # application creating the same file or folder in between.
235         self._node_lock = threading.Lock()
236         
237     def log_message(self, msg):
238         return " guid {} - host {} - {} "\
239             .format(self.guid, self.get("hostname"), msg)
240
241     @property
242     def home_dir(self):
243         home = self.get("home") or ""
244         if not home.startswith("/"):
245             home = os.path.join(self._home_dir, home) 
246             return home
247
248     @property
249     def nepi_home(self):
250         return os.path.join(self.home_dir, ".nepi")
251
252     @property
253     def usr_dir(self):
254         return os.path.join(self.nepi_home, "nepi-usr")
255
256     @property
257     def lib_dir(self):
258         return os.path.join(self.usr_dir, "lib")
259
260     @property
261     def bin_dir(self):
262         return os.path.join(self.usr_dir, "bin")
263
264     @property
265     def src_dir(self):
266         return os.path.join(self.usr_dir, "src")
267
268     @property
269     def share_dir(self):
270         return os.path.join(self.usr_dir, "share")
271
272     @property
273     def exp_dir(self):
274         return os.path.join(self.nepi_home, "nepi-exp")
275
276     @property
277     def exp_home(self):
278         return os.path.join(self.exp_dir, self.ec.exp_id)
279
280     @property
281     def node_home(self):
282         return os.path.join(self.exp_home, "node-{}".format(self.guid))
283
284     @property
285     def run_home(self):
286         return os.path.join(self.node_home, self.ec.run_id)
287
288     @property
289     def os(self):
290         if self._os:
291             return self._os
292
293         if not self.localhost and not self.get("username"):
294             msg = "Can't resolve OS, insufficient data "
295             self.error(msg)
296             raise RuntimeError(msg)
297
298         out = self.get_os()
299
300         if out.find("Debian") == 0: 
301             self._os = OSType.DEBIAN
302         elif out.find("Ubuntu") == 0:
303             self._os = OSType.UBUNTU
304         elif out.find("Fedora release") == 0:
305             self._os = OSType.FEDORA
306             if out.find("Fedora release 8") == 0:
307                 self._os = OSType.FEDORA_8
308             elif out.find("Fedora release 12") == 0:
309                 self._os = OSType.FEDORA_12
310             elif out.find("Fedora release 14") == 0:
311                 self._os = OSType.FEDORA_14
312         else:
313             msg = "Unsupported OS"
314             self.error(msg, out)
315             raise RuntimeError("{} - {} ".format(msg, out))
316
317         return self._os
318
319     def get_os(self):
320         # The underlying SSH layer will sometimes return an empty
321         # output (even if the command was executed without errors).
322         # To work arround this, repeat the operation N times or
323         # until the result is not empty string
324         out = ""
325         try:
326             (out, err), proc = self.execute("cat /etc/issue", 
327                                             with_lock = True,
328                                             blocking = True)
329         except:
330             trace = traceback.format_exc()
331             msg = "Error detecting OS: {} ".format(trace)
332             self.error(msg, out, err)
333     
334         return out
335
336     @property
337     def use_deb(self):
338         return (self.os & (OSType.DEBIAN | OSType.UBUNTU))
339
340     @property
341     def use_rpm(self):
342         return (self.os & OSType.FEDORA)
343
344     @property
345     def localhost(self):
346         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
347
348     def do_provision(self):
349         # check if host is alive
350         if not self.is_alive():
351             trace = traceback.format_exc()
352             msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
353             self.error(msg)
354             raise RuntimeError(msg)
355
356         self.find_home()
357
358         if self.get("cleanProcesses"):
359             self.clean_processes()
360
361         if self.get("cleanHome"):
362             self.clean_home()
363             
364         if self.get("cleanExperiment"):
365             self.clean_experiment()
366             
367         # Create shared directory structure and node home directory
368         paths = [self.lib_dir, 
369                  self.bin_dir, 
370                  self.src_dir, 
371                  self.share_dir, 
372                  self.node_home]
373
374         self.mkdir(paths)
375
376         # Get Public IP address if possible
377         if not self.get("ip"):
378             try:
379                 ip = sshfuncs.gethostbyname(self.get("hostname"))
380                 self.set("ip", ip)
381             except:
382                 if self.get("gateway") is None:
383                     msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
384                     self.error(msg)
385
386         super(LinuxNode, self).do_provision()
387
388     def do_deploy(self):
389         if self.state == ResourceState.NEW:
390             self.info("Deploying node")
391             self.do_discover()
392             self.do_provision()
393
394         # Node needs to wait until all associated interfaces are 
395         # ready before it can finalize deployment
396         from nepi.resources.linux.interface import LinuxInterface
397         ifaces = self.get_connected(LinuxInterface.get_rtype())
398         for iface in ifaces:
399             if iface.state < ResourceState.READY:
400                 self.ec.schedule(self.reschedule_delay, self.deploy)
401                 return 
402
403         super(LinuxNode, self).do_deploy()
404
405     def do_release(self):
406         rms = self.get_connected()
407         for rm in rms:
408             # Node needs to wait until all associated RMs are released
409             # before it can be released
410             if rm.state != ResourceState.RELEASED:
411                 self.ec.schedule(self.reschedule_delay, self.release)
412                 return 
413
414         tear_down = self.get("tearDown")
415         if tear_down:
416             self.execute(tear_down)
417
418         if self.get("cleanProcessesAfter"):
419             self.clean_processes()
420
421         super(LinuxNode, self).do_release()
422
423     def valid_connection(self, guid):
424         # TODO: Validate!
425         return True
426
427     def clean_processes(self):
428         self.info("Cleaning up processes")
429
430         if self.localhost:
431             return 
432         
433         if self.get("username") != 'root':
434             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
435                    "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
436                    "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
437         else:
438             if self.state >= ResourceState.READY:
439                 ########################
440                 #Collect all process (must change for a more intelligent way)
441                 ppid = []
442                 pids = []
443                 avoid_pids = "ps axjf | awk '{print $1,$2}'"
444                 (out, err), proc = self.execute(avoid_pids)
445                 if len(out) != 0:
446                     for line in out.strip().split("\n"):
447                         parts = line.strip().split(" ")
448                         ppid.append(parts[0])
449                         pids.append(parts[1])
450
451                 #Collect all process below ssh -D
452                 tree_owner = 0
453                 ssh_pids = []
454                 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
455                 (out, err), proc = self.execute(sshs)
456                 if len(out) != 0:
457                     for line in out.strip().split("\n"):
458                         parts = line.strip().split(" ")
459                         if parts[1].startswith('root@pts'):
460                             ssh_pids.append(parts[0])
461                         elif parts[1] == "-D":
462                             tree_owner = parts[0]
463
464                 avoid_kill = []
465                 temp = []
466                 #Search for the child process of the pid's collected at the first block.
467                 for process in ssh_pids:
468                     temp = self.search_for_child(process, pids, ppid)
469                     avoid_kill = list(set(temp))
470                 
471                 if len(avoid_kill) > 0:
472                     avoid_kill.append(tree_owner) 
473                 ########################
474
475                 import pickle
476                 with open("/tmp/save.proc", "rb") as pickle_file:
477                     pids = pickle.load(pickle_file)
478                 pids_temp = dict()
479                 ps_aux = "ps aux | awk '{print $2,$11}'"
480                 (out, err), proc = self.execute(ps_aux)
481                 if len(out) != 0:
482                     for line in out.strip().split("\n"):
483                         parts = line.strip().split(" ")
484                         pids_temp[parts[0]] = parts[1]
485                     # creates the difference between the machine pids freezed (pickle) and the actual
486                     # adding the avoided pids filtered above (avoid_kill) to allow users keep process
487                     # alive when using besides ssh connections  
488                     kill_pids = set(pids_temp.items()) - set(pids.items())
489                     # py2/py3 : keep it simple
490                     kill_pids = ' '.join(kill_pids)
491
492                     # removing pids from beside connections and its process
493                     kill_pids = kill_pids.split(' ')
494                     kill_pids = list(set(kill_pids) - set(avoid_kill))
495                     kill_pids = ' '.join(kill_pids)
496
497                     cmd = ("killall tcpdump || /bin/true ; " +
498                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
499                            "kill {} || /bin/true ; ".format(kill_pids))
500                 else:
501                     cmd = ("killall tcpdump || /bin/true ; " +
502                            "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
503             else:
504                 cmd = ("killall tcpdump || /bin/true ; " +
505                        "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
506
507         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
508
509     def search_for_child(self, pid, pids, ppid, family=None):
510         """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
511         """
512         family = family if family is not None else []
513         family.append(pid)
514         for key, value in enumerate(ppid):
515             if value == pid:
516                 child = pids[key]
517                 self.search_for_child(child, pids, ppid)
518         return family
519         
520     def clean_home(self):
521         """ Cleans all NEPI related folders in the Linux host
522         """
523         self.info("Cleaning up home")
524         
525         cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
526               .format(self.home_dir)
527
528         return self.execute(cmd, with_lock = True)
529
530     def clean_experiment(self):
531         """ Cleans all experiment related files in the Linux host.
532         It preserves NEPI files and folders that have a multi experiment
533         scope.
534         """
535         self.info("Cleaning up experiment files")
536         
537         cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
538               .format(self.exp_dir, self.ec.exp_id)
539         
540         return self.execute(cmd, with_lock = True)
541
542     def execute(self, command,
543                 sudo = False,
544                 env = None,
545                 tty = False,
546                 forward_x11 = False,
547                 retry = 3,
548                 connect_timeout = 30,
549                 strict_host_checking = False,
550                 persistent = True,
551                 blocking = True,
552                 with_lock = False
553     ):
554         """ Notice that this invocation will block until the
555         execution finishes. If this is not the desired behavior,
556         use 'run' instead."""
557
558         if self.localhost:
559             (out, err), proc = execfuncs.lexec(
560                 command, 
561                 user = self.get("username"), # still problem with localhost
562                 sudo = sudo,
563                 env = env)
564         else:
565             if with_lock:
566                 # If the execute command is blocking, we don't want to keep
567                 # the node lock. This lock is used to avoid race conditions
568                 # when creating the ControlMaster sockets. A more elegant
569                 # solution is needed.
570                 with self._node_lock:
571                     (out, err), proc = sshfuncs.rexec(
572                         command, 
573                         host = self.get("hostname"),
574                         user = self.get("username"),
575                         port = self.get("port"),
576                         gwuser = self.get("gatewayUser"),
577                         gw = self.get("gateway"),
578                         agent = True,
579                         sudo = sudo,
580                         identity = self.get("identity"),
581                         server_key = self.get("serverKey"),
582                         env = env,
583                         tty = tty,
584                         forward_x11 = forward_x11,
585                         retry = retry,
586                         connect_timeout = connect_timeout,
587                         persistent = persistent,
588                         blocking = blocking, 
589                         strict_host_checking = strict_host_checking
590                     )
591             else:
592                 (out, err), proc = sshfuncs.rexec(
593                     command, 
594                     host = self.get("hostname"),
595                     user = self.get("username"),
596                     port = self.get("port"),
597                     gwuser = self.get("gatewayUser"),
598                     gw = self.get("gateway"),
599                     agent = True,
600                     sudo = sudo,
601                     identity = self.get("identity"),
602                     server_key = self.get("serverKey"),
603                     env = env,
604                     tty = tty,
605                     forward_x11 = forward_x11,
606                     retry = retry,
607                     connect_timeout = connect_timeout,
608                     persistent = persistent,
609                     blocking = blocking, 
610                     strict_host_checking = strict_host_checking
611                 )
612
613         return (out, err), proc
614
615     def run(self, command, home,
616             create_home = False,
617             pidfile = 'pidfile',
618             stdin = None, 
619             stdout = 'stdout', 
620             stderr = 'stderr', 
621             sudo = False,
622             tty = False,
623             strict_host_checking = False):
624         
625         self.debug("Running command '{}'".format(command))
626         
627         if self.localhost:
628             (out, err), proc = execfuncs.lspawn(
629                 command, pidfile,
630                 home = home, 
631                 create_home = create_home, 
632                 stdin = stdin or '/dev/null',
633                 stdout = stdout or '/dev/null',
634                 stderr = stderr or '/dev/null',
635                 sudo = sudo) 
636         else:
637             with self._node_lock:
638                 (out, err), proc = sshfuncs.rspawn(
639                     command,
640                     pidfile = pidfile,
641                     home = home,
642                     create_home = create_home,
643                     stdin = stdin or '/dev/null',
644                     stdout = stdout or '/dev/null',
645                     stderr = stderr or '/dev/null',
646                     sudo = sudo,
647                     host = self.get("hostname"),
648                     user = self.get("username"),
649                     port = self.get("port"),
650                     gwuser = self.get("gatewayUser"),
651                     gw = self.get("gateway"),
652                     agent = True,
653                     identity = self.get("identity"),
654                     server_key = self.get("serverKey"),
655                     tty = tty,
656                     strict_host_checking = strict_host_checking
657                 )
658
659         return (out, err), proc
660
661     def getpid(self, home, pidfile = "pidfile"):
662         if self.localhost:
663             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
664         else:
665             with self._node_lock:
666                 pidtuple = sshfuncs.rgetpid(
667                     os.path.join(home, pidfile),
668                     host = self.get("hostname"),
669                     user = self.get("username"),
670                     port = self.get("port"),
671                     gwuser = self.get("gatewayUser"),
672                     gw = self.get("gateway"),
673                     agent = True,
674                     identity = self.get("identity"),
675                     server_key = self.get("serverKey"),
676                     strict_host_checking = False
677                 )
678         
679         return pidtuple
680
681     def status(self, pid, ppid):
682         if self.localhost:
683             status = execfuncs.lstatus(pid, ppid)
684         else:
685             with self._node_lock:
686                 status = sshfuncs.rstatus(
687                     pid, ppid,
688                     host = self.get("hostname"),
689                     user = self.get("username"),
690                     port = self.get("port"),
691                     gwuser = self.get("gatewayUser"),
692                     gw = self.get("gateway"),
693                     agent = True,
694                     identity = self.get("identity"),
695                     server_key = self.get("serverKey"),
696                     strict_host_checking = False
697                 )
698            
699         return status
700     
701     def kill(self, pid, ppid, sudo = False):
702         out = err = ""
703         proc = None
704         status = self.status(pid, ppid)
705
706         if status == sshfuncs.ProcStatus.RUNNING:
707             if self.localhost:
708                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
709             else:
710                 with self._node_lock:
711                     (out, err), proc = sshfuncs.rkill(
712                         pid, ppid,
713                         host = self.get("hostname"),
714                         user = self.get("username"),
715                         port = self.get("port"),
716                         gwuser = self.get("gatewayUser"),
717                         gw = self.get("gateway"),
718                         agent = True,
719                         sudo = sudo,
720                         identity = self.get("identity"),
721                         server_key = self.get("serverKey"),
722                         strict_host_checking = False
723                     )
724
725         return (out, err), proc
726
727     def copy(self, src, dst):
728         if self.localhost:
729             (out, err), proc = execfuncs.lcopy(
730                 src, dst, 
731                 recursive = True)
732         else:
733             with self._node_lock:
734                 (out, err), proc = sshfuncs.rcopy(
735                     src, dst, 
736                     port = self.get("port"),
737                     gwuser = self.get("gatewayUser"),
738                     gw = self.get("gateway"),
739                     identity = self.get("identity"),
740                     server_key = self.get("serverKey"),
741                     recursive = True,
742                     strict_host_checking = False)
743
744         return (out, err), proc
745
746     def upload(self, src, dst, text = False, overwrite = True,
747                raise_on_error = True, executable = False):
748         """ Copy content to destination
749
750         src  string with the content to copy. Can be:
751             - plain text
752             - a string with the path to a local file
753             - a string with a semi-colon separeted list of local files
754             - a string with a local directory
755
756         dst  string with destination path on the remote host (remote is 
757             always self.host)
758
759         when src is text input, it gets stored into a temp file before 
760         uploading; in this case, and if executable is True, said temp file
761         is made executable, and thus uploaded file will be too
762         """
763         # If source is a string input 
764         f = None
765         if text and not os.path.isfile(src):
766             # src is text input that should be uploaded as file
767             # create a temporal file with the content to upload
768             # in python3 we need to open in binary mode if str is bytes
769             mode = 'w' if isinstance(src, str) else 'wb'
770             f = tempfile.NamedTemporaryFile(mode = mode, delete = False)
771             f.write(src)
772             f.close()
773             if executable:
774                 # do something like chmod u+x
775                 mode = os.stat(f.name).st_mode
776                 mode |= stat.S_IXUSR
777                 os.chmod(f.name, mode)
778                 
779             src = f.name
780
781         # If dst files should not be overwritten, check that the files do not
782         # exist already
783         if isinstance(src, str):
784             src = [s.strip() for s in src.split(";")]
785     
786         if overwrite == False:
787             src = self.filter_existing_files(src, dst)
788             if not src:
789                 return ("", ""), None
790
791         if not self.localhost:
792             # Build destination as <user>@<server>:<path>
793             dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
794
795         ((out, err), proc) = self.copy(src, dst)
796
797         # clean up temp file
798         if f:
799             os.remove(f.name)
800
801         if err:
802             msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
803             self.error(msg, out, err)
804             
805             msg = "{} out: {} err: {}".format(msg, out, err)
806             if raise_on_error:
807                 raise RuntimeError(msg)
808
809         return ((out, err), proc)
810
811     def download(self, src, dst, raise_on_error = True):
812         if not self.localhost:
813             # Build destination as <user>@<server>:<path>
814             src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
815
816         ((out, err), proc) = self.copy(src, dst)
817
818         if err:
819             msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst) 
820             self.error(msg, out, err)
821
822             if raise_on_error:
823                 raise RuntimeError(msg)
824
825         return ((out, err), proc)
826
827     def install_packages_command(self, packages):
828         command = ""
829         if self.use_rpm:
830             command = rpmfuncs.install_packages_command(self.os, packages)
831         elif self.use_deb:
832             command = debfuncs.install_packages_command(self.os, packages)
833         else:
834             msg = "Error installing packages ( OS not known ) "
835             self.error(msg, self.os)
836             raise RuntimeError(msg)
837
838         return command
839
840     def install_packages(self, packages, home,
841                          run_home = None,
842                          raise_on_error = True):
843         """ Install packages in the Linux host.
844
845         'home' is the directory to upload the package installation script.
846         'run_home' is the directory from where to execute the script.
847         """
848         command = self.install_packages_command(packages)
849
850         run_home = run_home or home
851
852         (out, err), proc = self.run_and_wait(command, run_home, 
853                                              shfile = os.path.join(home, "instpkg.sh"),
854                                              pidfile = "instpkg_pidfile",
855                                              ecodefile = "instpkg_exitcode",
856                                              stdout = "instpkg_stdout", 
857                                              stderr = "instpkg_stderr",
858                                              overwrite = False,
859                                              raise_on_error = raise_on_error)
860
861         return (out, err), proc 
862
863     def remove_packages(self, packages, home, run_home = None,
864                         raise_on_error = True):
865         """ Uninstall packages from the Linux host.
866
867         'home' is the directory to upload the package un-installation script.
868         'run_home' is the directory from where to execute the script.
869         """
870         if self.use_rpm:
871             command = rpmfuncs.remove_packages_command(self.os, packages)
872         elif self.use_deb:
873             command = debfuncs.remove_packages_command(self.os, packages)
874         else:
875             msg = "Error removing packages ( OS not known ) "
876             self.error(msg)
877             raise RuntimeError(msg)
878
879         run_home = run_home or home
880
881         (out, err), proc = self.run_and_wait(command, run_home, 
882                                              shfile = os.path.join(home, "rmpkg.sh"),
883                                              pidfile = "rmpkg_pidfile",
884                                              ecodefile = "rmpkg_exitcode",
885                                              stdout = "rmpkg_stdout", 
886                                              stderr = "rmpkg_stderr",
887                                              overwrite = False,
888                                              raise_on_error = raise_on_error)
889         
890         return (out, err), proc 
891
892     def mkdir(self, paths, clean = False):
893         """ Paths is either a single remote directory path to create,
894         or a list of directories to create.
895         """
896         if clean:
897             self.rmdir(paths)
898
899         if isinstance(paths, str):
900             paths = [paths]
901
902         cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
903
904         return self.execute(cmd, with_lock = True)
905
906     def rmdir(self, paths):
907         """ Paths is either a single remote directory path to delete,
908         or a list of directories to delete.
909         """
910
911         if isinstance(paths, str):
912             paths = [paths]
913
914         cmd = " ; ".join(["rm -rf {}".format(path) for path in paths])
915
916         return self.execute(cmd, with_lock = True)
917     
918     def run_and_wait(self, command, home, 
919                      shfile = "cmd.sh",
920                      env = None,
921                      overwrite = True,
922                      wait_run = True,
923                      pidfile = "pidfile", 
924                      ecodefile = "exitcode", 
925                      stdin = None, 
926                      stdout = "stdout", 
927                      stderr = "stderr", 
928                      sudo = False,
929                      tty = False,
930                      raise_on_error = True):
931         """
932         Uploads the 'command' to a bash script in the host.
933         Then runs the script detached in background in the host, and
934         busy-waites until the script finishes executing.
935         """
936
937         if not shfile.startswith("/"):
938             shfile = os.path.join(home, shfile)
939
940         self.upload_command(command, 
941                             shfile = shfile, 
942                             ecodefile = ecodefile, 
943                             env = env,
944                             overwrite = overwrite)
945
946         command = "bash {}".format(shfile)
947         # run command in background in remote host
948         (out, err), proc = self.run(command, home, 
949                                     pidfile = pidfile,
950                                     stdin = stdin, 
951                                     stdout = stdout, 
952                                     stderr = stderr, 
953                                     sudo = sudo,
954                                     tty = tty)
955
956         # check no errors occurred
957         if proc.poll():
958             msg = " Failed to run command '{}' ".format(command)
959             self.error(msg, out, err)
960             if raise_on_error:
961                 raise RuntimeError(msg)
962
963         # Wait for pid file to be generated
964         pid, ppid = self.wait_pid(
965             home = home, 
966             pidfile = pidfile, 
967             raise_on_error = raise_on_error)
968
969         if wait_run:
970             # wait until command finishes to execute
971             self.wait_run(pid, ppid)
972             
973             (eout, err), proc = self.check_errors(home,
974                                                   ecodefile = ecodefile,
975                                                   stderr = stderr)
976
977             # Out is what was written in the stderr file
978             if err:
979                 msg = " Failed to run command '{}' ".format(command)
980                 self.error(msg, eout, err)
981
982                 if raise_on_error:
983                     raise RuntimeError(msg)
984
985         (out, oerr), proc = self.check_output(home, stdout)
986         
987         return (out, err), proc
988         
989     def exitcode(self, home, ecodefile = "exitcode"):
990         """
991         Get the exit code of an application.
992         Returns an integer value with the exit code 
993         """
994         (out, err), proc = self.check_output(home, ecodefile)
995
996         # Succeeded to open file, return exit code in the file
997         if proc.wait() == 0:
998             try:
999                 return int(out.strip())
1000             except:
1001                 # Error in the content of the file!
1002                 return ExitCode.CORRUPTFILE
1003
1004         # No such file or directory
1005         if proc.returncode == 1:
1006             return ExitCode.FILENOTFOUND
1007         
1008         # Other error from 'cat'
1009         return ExitCode.ERROR
1010
1011     def upload_command(self, command, 
1012                        shfile = "cmd.sh",
1013                        ecodefile = "exitcode",
1014                        overwrite = True,
1015                        env = None):
1016         """ Saves the command as a bash script file in the remote host, and
1017         forces to save the exit code of the command execution to the ecodefile
1018         """
1019
1020         if not (command.strip().endswith(";") or command.strip().endswith("&")):
1021             command += ";"
1022             
1023         # The exit code of the command will be stored in ecodefile
1024         command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1025                   .format(command = command, ecodefile = ecodefile)
1026
1027         # Export environment
1028         environ = self.format_environment(env)
1029
1030         # Add environ to command
1031         command = environ + command
1032
1033         return self.upload(command, shfile, text = True, overwrite = overwrite)
1034
1035     def format_environment(self, env, inline = False):
1036         """ Formats the environment variables for a command to be executed
1037         either as an inline command
1038         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
1039         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1040         """
1041         if not env: return ""
1042
1043         # Remove extra white spaces
1044         env = re.sub(r'\s+', ' ', env.strip())
1045
1046         sep = ";" if inline else "\n"
1047         return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep 
1048
1049     def check_errors(self, home, 
1050                      ecodefile = "exitcode", 
1051                      stderr = "stderr"):
1052         """ Checks whether errors occurred while running a command.
1053         It first checks the exit code for the command, and only if the
1054         exit code is an error one it returns the error output.
1055
1056         """
1057         proc = None
1058         err = ""
1059
1060         # get exit code saved in the 'exitcode' file
1061         ecode = self.exitcode(home, ecodefile)
1062
1063         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1064             err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1065         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1066             # The process returned an error code or didn't exist. 
1067             # Check standard error.
1068             (err, eerr), proc = self.check_output(home, stderr)
1069
1070             # If the stderr file was not found, assume nothing bad happened,
1071             # and just ignore the error.
1072             # (cat returns 1 for error "No such file or directory")
1073             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
1074                 err = "" 
1075                 
1076         return ("", err), proc
1077     
1078     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1079         """ Waits until the pid file for the command is generated, 
1080             and returns the pid and ppid of the process """
1081         pid = ppid = None
1082         delay = 1.0
1083
1084         for i in range(2):
1085             pidtuple = self.getpid(home = home, pidfile = pidfile)
1086             
1087             if pidtuple:
1088                 pid, ppid = pidtuple
1089                 break
1090             else:
1091                 time.sleep(delay)
1092                 delay = delay * 1.5
1093         else:
1094             msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1095             self.error(msg)
1096     
1097             if raise_on_error:
1098                 raise RuntimeError(msg)
1099
1100         return pid, ppid
1101
1102     def wait_run(self, pid, ppid, trial = 0):
1103         """ wait for a remote process to finish execution """
1104         delay = 1.0
1105
1106         while True:
1107             status = self.status(pid, ppid)
1108             
1109             if status is ProcStatus.FINISHED:
1110                 break
1111             elif status is not ProcStatus.RUNNING:
1112                 delay = delay * 1.5
1113                 time.sleep(delay)
1114                 # If it takes more than 20 seconds to start, then
1115                 # asume something went wrong
1116                 if delay > 20:
1117                     break
1118             else:
1119                 # The app is running, just wait...
1120                 time.sleep(0.5)
1121
1122     def check_output(self, home, filename):
1123         """ Retrives content of file """
1124         (out, err), proc = self.execute(
1125             "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1126         return (out, err), proc
1127
1128     def is_alive(self):
1129         """ Checks if host is responsive
1130         """
1131         if self.localhost:
1132             return True
1133
1134         out = err = ""
1135         msg = "Unresponsive host. Wrong answer. "
1136
1137         # The underlying SSH layer will sometimes return an empty
1138         # output (even if the command was executed without errors).
1139         # To work arround this, repeat the operation N times or
1140         # until the result is not empty string
1141         try:
1142             (out, err), proc = self.execute("echo 'ALIVE'",
1143                                             blocking = True,
1144                                             with_lock = True)
1145             
1146             if out.find("ALIVE") > -1:
1147                 return True
1148         except:
1149             trace = traceback.format_exc()
1150             msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1151
1152         self.error(msg, out, err)
1153         return False
1154
1155     def find_home(self):
1156         """ 
1157         Retrieves host home directory
1158         """
1159         # The underlying SSH layer will sometimes return an empty
1160         # output (even if the command was executed without errors).
1161         # To work arround this, repeat the operation N times or
1162         # until the result is not empty string
1163         msg = "Impossible to retrieve HOME directory"
1164         try:
1165             (out, err), proc = self.execute("echo ${HOME}",
1166                                             blocking = True,
1167                                             with_lock = True)
1168             
1169             if out.strip() != "":
1170                 self._home_dir =  out.strip()
1171         except:
1172             trace = traceback.format_exc()
1173             msg = "Impossible to retrieve HOME directory {}".format(trace)
1174
1175         if not self._home_dir:
1176             self.error(msg)
1177             raise RuntimeError(msg)
1178
1179     def filter_existing_files(self, src, dst):
1180         """ Removes files that already exist in the Linux host from src list
1181         """
1182         # construct a dictionary with { dst: src }
1183         dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1184                 if len(src) > 1 else {dst: src[0]}
1185
1186         command = []
1187         for d in dests:
1188             command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst = d) )
1189
1190         command = ";".join(command)
1191
1192         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1193         
1194         # avoid RuntimeError that would result from
1195         # changing loop subject during iteration
1196         keys = list(dests.keys())
1197         for d in keys:
1198             if out.find(d) > -1:
1199                 del dests[d]
1200
1201         if not dests:
1202             return []
1203
1204         retcod = dests.values()
1205         if PY3: retcod = list(retcod)
1206         return retcod