Modified FailureManager to abort only when critical resources fail
[nepi.git] / src / nepi / resources / linux / node.py
index 9188022..953793a 100644 (file)
@@ -17,9 +17,9 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-        reschedule_delay
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux import rpmfuncs, debfuncs 
 from nepi.util import sshfuncs, execfuncs
 from nepi.util.sshfuncs import ProcStatus
@@ -57,7 +57,7 @@ class OSType:
     UBUNTU = "ubuntu"
     DEBIAN = "debian"
 
-@clsinit
+@clsinit_copy
 class LinuxNode(ResourceManager):
     """
     .. class:: Class Args :
@@ -142,6 +142,9 @@ class LinuxNode(ResourceManager):
 
     """
     _rtype = "LinuxNode"
+    _help = "Controls Linux host machines ( either localhost or a host " \
+            "that can be accessed using a SSH key)"
+    _backend_type = "linux"
 
     @classmethod
     def _register_attributes(cls):
@@ -165,14 +168,20 @@ class LinuxNode(ResourceManager):
         
         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
                 " from node home folder before starting experiment", 
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
 
         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
                 " from a previous same experiment, before the new experiment starts", 
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
         
         clean_processes = Attribute("cleanProcesses", 
                 "Kill all running processes before starting experiment",
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
         
         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
@@ -196,8 +205,14 @@ class LinuxNode(ResourceManager):
         # home directory at Linux host
         self._home_dir = ""
         
-        # lock to avoid concurrency issues on methods used by applications 
-        self._lock = threading.Lock()
+        # lock to prevent concurrent applications on the same node,
+        # to execute commands at the same time. There are potential
+        # concurrency issues when using SSH to a same host from 
+        # multiple threads. There are also possible operational 
+        # issues, e.g. an application querying the existence 
+        # of a file or folder prior to its creation, and another 
+        # application creating the same file or folder in between.
+        self._node_lock = threading.Lock()
     
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
@@ -300,7 +315,6 @@ class LinuxNode(ResourceManager):
             time.sleep(min(30.0, retrydelay))
             retrydelay *= 1.5
 
-
     @property
     def use_deb(self):
         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
@@ -314,11 +328,10 @@ class LinuxNode(ResourceManager):
     def localhost(self):
         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
 
+    @failtrap
     def provision(self):
         # check if host is alive
         if not self.is_alive():
-            self.fail()
-            
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
             self.error(msg)
             raise RuntimeError, msg
@@ -345,14 +358,12 @@ class LinuxNode(ResourceManager):
 
         super(LinuxNode, self).provision()
 
+    @failtrap
     def deploy(self):
         if self.state == ResourceState.NEW:
-            try:
-                self.discover()
-                self.provision()
-            except:
-                self._state = ResourceState.FAILED
-                raise
+            self.info("Deploying node")
+            self.discover()
+            self.provision()
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
@@ -366,19 +377,24 @@ class LinuxNode(ResourceManager):
         super(LinuxNode, self).deploy()
 
     def release(self):
-        # Node needs to wait until all associated RMs are released
-        # to be released
-        rms = self.get_connected()
-        for rm in rms:
-            if rm.state < ResourceState.STOPPED:
-                self.ec.schedule(reschedule_delay, self.release)
-                return 
-
-        tear_down = self.get("tearDown")
-        if tear_down:
-            self.execute(tear_down)
+        try:
+            rms = self.get_connected()
+            for rm in rms:
+                # Node needs to wait until all associated RMs are released
+                # before it can be released
+                if rm.state < ResourceState.STOPPED:
+                    self.ec.schedule(reschedule_delay, self.release)
+                    return 
+
+            tear_down = self.get("tearDown")
+            if tear_down:
+                self.execute(tear_down)
 
-        self.clean_processes()
+            self.clean_processes()
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(LinuxNode, self).release()
 
@@ -456,7 +472,7 @@ class LinuxNode(ResourceManager):
                     env = env)
         else:
             if with_lock:
-                with self._lock:
+                with self._node_lock:
                     (out, err), proc = sshfuncs.rexec(
                         command, 
                         host = self.get("hostname"),
@@ -524,7 +540,7 @@ class LinuxNode(ResourceManager):
                     sudo = sudo,
                     user = user) 
         else:
-            with self._lock:
+            with self._node_lock:
                 (out, err), proc = sshfuncs.rspawn(
                     command,
                     pidfile = pidfile,
@@ -549,7 +565,7 @@ class LinuxNode(ResourceManager):
         if self.localhost:
             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
         else:
-            with self._lock:
+            with self._node_lock:
                 pidtuple = sshfuncs.rgetpid(
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
@@ -566,7 +582,7 @@ class LinuxNode(ResourceManager):
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
         else:
-            with self._lock:
+            with self._node_lock:
                 status = sshfuncs.rstatus(
                         pid, ppid,
                         host = self.get("hostname"),
@@ -588,7 +604,7 @@ class LinuxNode(ResourceManager):
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
-                with self._lock:
+                with self._node_lock:
                     (out, err), proc = sshfuncs.rkill(
                         pid, ppid,
                         host = self.get("hostname"),
@@ -608,7 +624,7 @@ class LinuxNode(ResourceManager):
                     recursive = True,
                     strict_host_checking = False)
         else:
-            with self._lock:
+            with self._node_lock:
                 (out, err), proc = sshfuncs.rcopy(
                     src, dst, 
                     port = self.get("port"),
@@ -619,7 +635,6 @@ class LinuxNode(ResourceManager):
 
         return (out, err), proc
 
-
     def upload(self, src, dst, text = False, overwrite = True):
         """ Copy content to destination