Added Linux Application
[nepi.git] / src / neco / resources / linux / node.py
index b482747..a030eb4 100644 (file)
@@ -13,6 +13,9 @@ import time
 import threading
 
 # TODO: Verify files and dirs exists already
+# TODO: Blacklist node!
+
+DELAY ="1s"
 
 @clsinit
 class LinuxNode(ResourceManager):
@@ -20,32 +23,35 @@ class LinuxNode(ResourceManager):
 
     @classmethod
     def _register_attributes(cls):
-        hostname = Attribute("hostname", "Hostname of the machine")
+        hostname = Attribute("hostname", "Hostname of the machine",
+                flags = Flags.ExecReadOnly)
 
         username = Attribute("username", "Local account username", 
                 flags = Flags.Credential)
 
-        port = Attribute("port", "SSH port", flags = Flags.Credential)
+        port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
         
-        home = Attribute("home", 
-                "Experiment home directory to store all experiment related files")
+        home = Attribute("home",
+                "Experiment home directory to store all experiment related files",
+                flags = Flags.ExecReadOnly)
         
         identity = Attribute("identity", "SSH identity file",
                 flags = Flags.Credential)
         
         server_key = Attribute("serverKey", "Server public key", 
-                flags = Flags.Credential)
+                flags = Flags.ExecReadOnly)
         
         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
                 " from home folder before starting experiment", 
-                flags = Flags.ReadOnly)
+                flags = Flags.ExecReadOnly)
         
         clean_processes = Attribute("cleanProcesses", 
-                "Kill all running processes before starting experiment", 
-                flags = Flags.ReadOnly)
+                "Kill all running processes before starting experiment",
+                flags = Flags.ExecReadOnly)
         
         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
-                "releasing the resource", flags = Flags.ReadOnly)
+                "releasing the resource",
+                flags = Flags.ExecReadOnly)
 
         cls._register_attribute(hostname)
         cls._register_attribute(username)
@@ -60,7 +66,6 @@ class LinuxNode(ResourceManager):
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
         self._os = None
-        self._home = "nepi-exp-%s" % os.urandom(8).encode('hex')
         
         # lock to avoid concurrency issues on methods used by applications 
         self._lock = threading.Lock()
@@ -69,10 +74,17 @@ class LinuxNode(ResourceManager):
 
     @property
     def home(self):
-        home = self.get("home")
-        if home and not home.startswith("nepi-"):
-            home = "nepi-" + home
-        return home or self._home
+        return self.get("home") or "/tmp"
+
+    @property
+    def exp_dir(self):
+        exp_dir = os.path.join(self.home, self.ec.exp_id)
+        return exp_dir if exp_dir.startswith('/') else "${HOME}/"
+
+    @property
+    def node_dir(self):
+        node_dir = "node-%d" % self.guid
+        return os.path.join(self.exp_dir, node_dir)
 
     @property
     def os(self):
@@ -116,17 +128,29 @@ class LinuxNode(ResourceManager):
             self.logger.error("Deploy failed. Unresponsive node")
             return
 
-    def deploy(self):
-        self.provision()
-
         if self.get("cleanProcesses"):
             self.clean_processes()
 
         if self.get("cleanHome"):
-            # self.clean_home() -> this is dangerous
-            pass
+            self.clean_home()
+       
+        self.mkdir(self.node_dir)
 
-        self.mkdir(self.home)
+        super(LinuxNode, self).provision()
+
+    def deploy(self):
+        if self.state == ResourceState.NEW:
+            self.discover()
+            self.provision()
+
+        # Node needs to wait until all associated interfaces are 
+        # ready before it can finalize deployment
+        from neco.resources.linux.interface import LinuxInterface
+        ifaces = self.get_connected(LinuxInterface.rtype())
+        for iface in ifaces:
+            if iface.state < ResourceState.READY:
+                self.ec.schedule(DELAY, self.deploy)
+                return 
 
         super(LinuxNode, self).deploy()
 
@@ -137,7 +161,7 @@ class LinuxNode(ResourceManager):
 
         super(LinuxNode, self).release()
 
-    def validate_connection(self, guid):
+    def valid_connection(self, guid):
         # TODO: Validate!
         return True
 
@@ -152,38 +176,31 @@ class LinuxNode(ResourceManager):
 
         out = err = ""
         with self._lock:
-           (out, err), proc = self.run_and_wait(cmd, self.home, 
-                pidfile = "cppid",
-                stdout = "cplog", 
-                stderr = "cperr", 
-                raise_on_error = True)
-
-        return (out, err)   
+           (out, err), proc = self.execute(cmd) 
             
     def clean_home(self):
         self.logger.info("Cleaning up home")
 
-        cmd = "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \) -execdir rm -rf {} + "
+        cmd = ("cd %s ; " % self.home +
+            "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
+            " -execdir rm -rf {} + ")
 
         out = err = ""
         with self._lock:
-            (out, err), proc = self.run_and_wait(cmd, self.home,
-                pidfile = "chpid",
-                stdout = "chlog", 
-                stderr = "cherr", 
-                raise_on_error = True)
-        
-        return (out, err)   
+            (out, err), proc = self.execute(cmd)
 
-    def upload(self, src, dst):
+    def upload(self, src, dst, text = False):
         """ Copy content to destination
 
-           src  content to copy. Can be a local file, directory or text input
+           src  content to copy. Can be a local file, directory or a list of files
 
            dst  destination path on the remote host (remote is always self.host)
+
+           text src is text input, it must be stored into a temp file before uploading
         """
         # If source is a string input 
-        if not os.path.isfile(src):
+        f = None
+        if text and not os.path.isfile(src):
             # src is text input that should be uploaded as file
             # create a temporal file with the content to upload
             f = tempfile.NamedTemporaryFile(delete=False)
@@ -195,7 +212,13 @@ class LinuxNode(ResourceManager):
             # Build destination as <user>@<server>:<path>
             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
 
-        return self.copy(src, dst)
+        result = self.copy(src, dst)
+
+        # clean up temp file
+        if f:
+            os.remove(f.name)
+
+        return result
 
     def download(self, src, dst):
         if not self.localhost:
@@ -203,7 +226,9 @@ class LinuxNode(ResourceManager):
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
 
-    def install_packages(self, packages):
+    def install_packages(self, packages, home = None):
+        home = home or self.node_dir
+
         cmd = ""
         if self.os in ["f12", "f14"]:
             cmd = rpmfuncs.install_packages_command(self.os, packages)
@@ -217,15 +242,17 @@ class LinuxNode(ResourceManager):
 
         out = err = ""
         with self._lock:
-            (out, err), proc = self.run_and_wait(cmd, self.home, 
-                pidfile = "instpkgpid",
-                stdout = "instpkglog", 
-                stderr = "instpkgerr", 
+            (out, err), proc = self.run_and_wait(cmd, home, 
+                pidfile = "instpkg_pid",
+                stdout = "instpkg_log", 
+                stderr = "instpkg_err", 
                 raise_on_error = True)
 
         return (out, err), proc 
 
-    def remove_packages(self, packages):
+    def remove_packages(self, packages, home = None):
+        home = home or self.node_dir
+
         cmd = ""
         if self.os in ["f12", "f14"]:
             cmd = rpmfuncs.remove_packages_command(self.os, packages)
@@ -239,10 +266,10 @@ class LinuxNode(ResourceManager):
 
         out = err = ""
         with self._lock:
-            (out, err), proc = self.run_and_wait(cmd, self.home, 
-                pidfile = "rmpkgpid",
-                stdout = "rmpkglog", 
-                stderr = "rmpkgerr", 
+            (out, err), proc = self.run_and_wait(cmd, home, 
+                pidfile = "rmpkg_pid",
+                stdout = "rmpkg_log", 
+                stderr = "rmpkg_err", 
                 raise_on_error = True)
          
         return (out, err), proc 
@@ -264,7 +291,13 @@ class LinuxNode(ResourceManager):
             stderr = 'stderr', 
             sudo = False,
             raise_on_error = False):
-
+        """ runs a command in background on the remote host, but waits
+            until the command finishes execution.
+            This is more robust than doing a simple synchronized 'execute',
+            since in the remote host the command can continue to run detached
+            even if network disconnections occur
+        """
+        # run command in background in remote host
         (out, err), proc = self.run(command, home, 
                 pidfile = pidfile,
                 stdin = stdin, 
@@ -272,21 +305,25 @@ class LinuxNode(ResourceManager):
                 stderr = stderr, 
                 sudo = sudo)
 
+        # check no errors occurred
         if proc.poll() and err:
             msg = " Failed to run command %s on host %s" % (
                     command, self.get("hostname"))
             self.logger.error(msg)
             if raise_on_error:
                 raise RuntimeError, msg
-        
+
+        # Wait for pid file to be generated
         pid, ppid = self.wait_pid(
                 home = home, 
                 pidfile = pidfile, 
                 raise_on_error = raise_on_error)
 
+        # wait until command finishes to execute
         self.wait_run(pid, ppid)
-        
-        (out, err), proc = self.check_run_error(home, stderr)
+       
+        # check if execution errors occurred
+        (out, err), proc = self.check_output(home, stderr)
 
         if err or out:
             msg = "Error while running command %s on host %s. error output: %s" % (
@@ -301,6 +338,8 @@ class LinuxNode(ResourceManager):
         return (out, err), proc
  
     def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+        """ Waits until the pid file for the command is generated, 
+            and returns the pid and ppid of the process """
         pid = ppid = None
         delay = 1.0
         for i in xrange(5):
@@ -322,6 +361,7 @@ class LinuxNode(ResourceManager):
         return pid, ppid
 
     def wait_run(self, pid, ppid, trial = 0):
+        """ wait for a remote process to finish execution """
         delay = 1.0
         first = True
         bustspin = 0
@@ -344,17 +384,12 @@ class LinuxNode(ResourceManager):
                 delay = min(30,delay*1.2)
                 bustspin = 0
 
-    def check_run_error(self, home, stderr = 'stderr'):
+    def check_output(self, home, filename):
+        """ checks file content """
         (out, err), proc = self.execute("cat %s" % 
-                os.path.join(home, stderr))
+                os.path.join(home, filename))
         return (out, err), proc
 
-    def check_run_output(self, home, stdout = 'stdout'):
-        (out, err), proc = self.execute("cat %s" % 
-                os.path.join(home, stdout))
-        return (out, err), proc
-
-
     def is_alive(self):
         if self.localhost:
             return True
@@ -575,7 +610,7 @@ class LinuxNode(ResourceManager):
                             self.logger.error("%s. out: %s error: %s", fail_msg, out, err)
                     break
                 except RuntimeError, e:
-                    if x >= 3:
+                    if i >= 3:
                         self.logger.error("%s. error: %s", fail_msg, e.args)
             return (out, err), proc