Replacing RM.rtype() for RM.get_type() for consistency
[nepi.git] / src / nepi / resources / linux / node.py
index 5080a6c..cbc0099 100644 (file)
@@ -17,9 +17,9 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-        reschedule_delay
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay
 from nepi.resources.linux import rpmfuncs, debfuncs 
 from nepi.util import sshfuncs, execfuncs
 from nepi.util.sshfuncs import ProcStatus
@@ -57,7 +57,7 @@ class OSType:
     UBUNTU = "ubuntu"
     DEBIAN = "debian"
 
-@clsinit
+@clsinit_copy
 class LinuxNode(ResourceManager):
     """
     .. class:: Class Args :
@@ -142,6 +142,9 @@ class LinuxNode(ResourceManager):
 
     """
     _rtype = "LinuxNode"
+    _help = "Controls Linux host machines ( either localhost or a host " \
+            "that can be accessed using a SSH key)"
+    _backend_type = "linux"
 
     @classmethod
     def _register_attributes(cls):
@@ -165,14 +168,20 @@ class LinuxNode(ResourceManager):
         
         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
                 " from node home folder before starting experiment", 
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
 
         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
                 " from a previous same experiment, before the new experiment starts", 
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
         
         clean_processes = Attribute("cleanProcesses", 
                 "Kill all running processes before starting experiment",
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
         
         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
@@ -196,8 +205,14 @@ class LinuxNode(ResourceManager):
         # home directory at Linux host
         self._home_dir = ""
         
-        # lock to avoid concurrency issues on methods used by applications 
-        self._lock = threading.Lock()
+        # lock to prevent concurrent applications on the same node,
+        # to execute commands at the same time. There are potential
+        # concurrency issues when using SSH to a same host from 
+        # multiple threads. There are also possible operational 
+        # issues, e.g. an application querying the existence 
+        # of a file or folder prior to its creation, and another 
+        # application creating the same file or folder in between.
+        self._node_lock = threading.Lock()
     
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
@@ -256,12 +271,7 @@ class LinuxNode(ResourceManager):
             self.error(msg)
             raise RuntimeError, msg
 
-        (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
-
-        if err and proc.poll():
-            msg = "Error detecting OS "
-            self.error(msg, out, err)
-            raise RuntimeError, "%s - %s - %s" %( msg, out, err )
+        out = self.get_os()
 
         if out.find("Fedora release 8") == 0:
             self._os = OSType.FEDORA_8
@@ -269,6 +279,8 @@ class LinuxNode(ResourceManager):
             self._os = OSType.FEDORA_12
         elif out.find("Fedora release 14") == 0:
             self._os = OSType.FEDORA_14
+        elif out.find("Fedora release") == 0:
+            self._os = OSType.FEDORA
         elif out.find("Debian") == 0: 
             self._os = OSType.DEBIAN
         elif out.find("Ubuntu") ==0:
@@ -280,6 +292,23 @@ class LinuxNode(ResourceManager):
 
         return self._os
 
+    def get_os(self):
+        # The underlying SSH layer will sometimes return an empty
+        # output (even if the command was executed without errors).
+        # To work arround this, repeat the operation N times or
+        # until the result is not empty string
+        out = ""
+        try:
+            (out, err), proc = self.execute("cat /etc/issue", 
+                    with_lock = True,
+                    blocking = True)
+        except:
+            trace = traceback.format_exc()
+            msg = "Error detecting OS: %s " % trace
+            self.error(msg, out, err)
+        
+        return out
+
     @property
     def use_deb(self):
         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
@@ -293,11 +322,9 @@ class LinuxNode(ResourceManager):
     def localhost(self):
         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
 
-    def provision(self):
+    def do_provision(self):
         # check if host is alive
         if not self.is_alive():
-            self.fail()
-            
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
             self.error(msg)
             raise RuntimeError, msg
@@ -322,36 +349,41 @@ class LinuxNode(ResourceManager):
         # Create experiment node home directory
         self.mkdir(self.node_home)
 
-        super(LinuxNode, self).provision()
+        super(LinuxNode, self).do_provision()
 
-    def deploy(self):
+    def do_deploy(self):
         if self.state == ResourceState.NEW:
-            try:
-                self.discover()
-                self.provision()
-            except:
-                self._state = ResourceState.FAILED
-                raise
+            self.info("Deploying node")
+            self.do_discover()
+            self.do_provision()
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
         from nepi.resources.linux.interface import LinuxInterface
-        ifaces = self.get_connected(LinuxInterface.rtype())
+        ifaces = self.get_connected(LinuxInterface.get_rtype())
         for iface in ifaces:
             if iface.state < ResourceState.READY:
                 self.ec.schedule(reschedule_delay, self.deploy)
                 return 
 
-        super(LinuxNode, self).deploy()
+        super(LinuxNode, self).do_deploy()
+
+    def do_release(self):
+        rms = self.get_connected()
+        for rm in rms:
+            # Node needs to wait until all associated RMs are released
+            # before it can be released
+            if rm.state != ResourceState.RELEASED:
+                self.ec.schedule(reschedule_delay, self.release)
+                return 
 
-    def release(self):
         tear_down = self.get("tearDown")
         if tear_down:
             self.execute(tear_down)
 
         self.clean_processes()
 
-        super(LinuxNode, self).release()
+        super(LinuxNode, self).do_release()
 
     def valid_connection(self, guid):
         # TODO: Validate!
@@ -375,8 +407,8 @@ class LinuxNode(ResourceManager):
                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
 
         out = err = ""
-        (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
-            
+        (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
+
     def clean_home(self):
         """ Cleans all NEPI related folders in the Linux host
         """
@@ -427,7 +459,7 @@ class LinuxNode(ResourceManager):
                     env = env)
         else:
             if with_lock:
-                with self._lock:
+                with self._node_lock:
                     (out, err), proc = sshfuncs.rexec(
                         command, 
                         host = self.get("hostname"),
@@ -495,7 +527,7 @@ class LinuxNode(ResourceManager):
                     sudo = sudo,
                     user = user) 
         else:
-            with self._lock:
+            with self._node_lock:
                 (out, err), proc = sshfuncs.rspawn(
                     command,
                     pidfile = pidfile,
@@ -520,7 +552,7 @@ class LinuxNode(ResourceManager):
         if self.localhost:
             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
         else:
-            with self._lock:
+            with self._node_lock:
                 pidtuple = sshfuncs.rgetpid(
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
@@ -537,7 +569,7 @@ class LinuxNode(ResourceManager):
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
         else:
-            with self._lock:
+            with self._node_lock:
                 status = sshfuncs.rstatus(
                         pid, ppid,
                         host = self.get("hostname"),
@@ -559,7 +591,7 @@ class LinuxNode(ResourceManager):
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
-                with self._lock:
+                with self._node_lock:
                     (out, err), proc = sshfuncs.rkill(
                         pid, ppid,
                         host = self.get("hostname"),
@@ -579,7 +611,7 @@ class LinuxNode(ResourceManager):
                     recursive = True,
                     strict_host_checking = False)
         else:
-            with self._lock:
+            with self._node_lock:
                 (out, err), proc = sshfuncs.rcopy(
                     src, dst, 
                     port = self.get("port"),
@@ -590,7 +622,6 @@ class LinuxNode(ResourceManager):
 
         return (out, err), proc
 
-
     def upload(self, src, dst, text = False, overwrite = True):
         """ Copy content to destination
 
@@ -635,12 +666,7 @@ class LinuxNode(ResourceManager):
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
 
-    def install_packages(self, packages, home, run_home = None):
-        """ Install packages in the Linux host.
-
-        'home' is the directory to upload the package installation script.
-        'run_home' is the directory from where to execute the script.
-        """
+    def install_packages_command(self, packages):
         command = ""
         if self.use_rpm:
             command = rpmfuncs.install_packages_command(self.os, packages)
@@ -651,6 +677,16 @@ class LinuxNode(ResourceManager):
             self.error(msg, self.os)
             raise RuntimeError, msg
 
+        return command
+
+    def install_packages(self, packages, home, run_home = None):
+        """ Install packages in the Linux host.
+
+        'home' is the directory to upload the package installation script.
+        'run_home' is the directory from where to execute the script.
+        """
+        command = self.install_packages_command(packages)
+
         run_home = run_home or home
 
         (out, err), proc = self.run_and_wait(command, run_home, 
@@ -754,18 +790,19 @@ class LinuxNode(ResourceManager):
         # wait until command finishes to execute
         self.wait_run(pid, ppid)
       
-        (out, err), proc = self.check_errors(home,
+        (eout, err), proc = self.check_errors(home,
             ecodefile = ecodefile,
-            stdout = stdout,
-            stderr= stderr)
+            stderr = stderr)
 
         # Out is what was written in the stderr file
         if err:
             msg = " Failed to run command '%s' " % command
-            self.error(msg, out, err)
+            self.error(msg, eout, err)
 
             if raise_on_error:
                 raise RuntimeError, msg
+
+        (out, oerr), proc = self.check_output(home, stdout)
         
         return (out, err), proc
 
@@ -818,8 +855,8 @@ class LinuxNode(ResourceManager):
         return self.upload(command, shfile, text = True, overwrite = overwrite)
 
     def format_environment(self, env, inline = False):
-        """Format environmental variables for command to be executed either
-        as an inline command
+        """ Formats the environment variables for a command to be executed
+        either as an inline command
         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
         """
@@ -833,18 +870,14 @@ class LinuxNode(ResourceManager):
 
     def check_errors(self, home, 
             ecodefile = "exitcode", 
-            stdout = "stdout",
             stderr = "stderr"):
-        """
-        Checks whether errors occurred while running a command.
+        """ Checks whether errors occurred while running a command.
         It first checks the exit code for the command, and only if the
         exit code is an error one it returns the error output.
 
         """
         proc = None
         err = ""
-        # retrive standard output from the file
-        (out, oerr), oproc = self.check_output(home, stdout)
 
         # get exit code saved in the 'exitcode' file
         ecode = self.exitcode(home, ecodefile)
@@ -862,7 +895,7 @@ class LinuxNode(ResourceManager):
             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
                 err = "" 
             
-        return (out, err), proc
+        return ("", err), proc
  
     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
         """ Waits until the pid file for the command is generated, 
@@ -870,7 +903,7 @@ class LinuxNode(ResourceManager):
         pid = ppid = None
         delay = 1.0
 
-        for i in xrange(4):
+        for i in xrange(2):
             pidtuple = self.getpid(home = home, pidfile = pidfile)
             
             if pidtuple:
@@ -891,7 +924,7 @@ class LinuxNode(ResourceManager):
 
     def wait_run(self, pid, ppid, trial = 0):
         """ wait for a remote process to finish execution """
-        start_delay = 1.0
+        delay = 1.0
 
         while True:
             status = self.status(pid, ppid)
@@ -922,36 +955,49 @@ class LinuxNode(ResourceManager):
             return True
 
         out = err = ""
+        msg = "Unresponsive host. Wrong answer. "
+
+        # The underlying SSH layer will sometimes return an empty
+        # output (even if the command was executed without errors).
+        # To work arround this, repeat the operation N times or
+        # until the result is not empty string
         try:
             (out, err), proc = self.execute("echo 'ALIVE'",
-                    retry = 5, 
+                    blocking = True,
                     with_lock = True)
+    
+            if out.find("ALIVE") > -1:
+                return True
         except:
             trace = traceback.format_exc()
-            msg = "Unresponsive host  %s " % err
-            self.error(msg, out, trace)
-            return False
+            msg = "Unresponsive host. Error reaching host: %s " % trace
 
-        if out.strip() == "ALIVE":
-            return True
-        else:
-            msg = "Unresponsive host "
-            self.error(msg, out, err)
-            return False
+        self.error(msg, out, err)
+        return False
 
     def find_home(self):
         """ Retrieves host home directory
         """
-        (out, err), proc = self.execute("echo ${HOME}", retry = 5, 
+        # The underlying SSH layer will sometimes return an empty
+        # output (even if the command was executed without errors).
+        # To work arround this, repeat the operation N times or
+        # until the result is not empty string
+        msg = "Impossible to retrieve HOME directory"
+        try:
+            (out, err), proc = self.execute("echo ${HOME}",
+                    blocking = True,
                     with_lock = True)
+    
+            if out.strip() != "":
+                self._home_dir =  out.strip()
+        except:
+            trace = traceback.format_exc()
+            msg = "Impossible to retrieve HOME directory" % trace
 
-        if proc.poll():
-            msg = "Imposible to retrieve HOME directory"
+        if not self._home_dir:
             self.error(msg, out, err)
             raise RuntimeError, msg
 
-        self._home_dir =  out.strip()
-
     def filter_existing_files(self, src, dst):
         """ Removes files that already exist in the Linux host from src list
         """