Merging ns-3 into nepi-3-dev
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 17 Feb 2014 18:53:45 +0000 (19:53 +0100)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 17 Feb 2014 18:53:45 +0000 (19:53 +0100)
1  2 
src/nepi/execution/resource.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/ccnd.py
src/nepi/resources/linux/node.py
src/nepi/resources/planetlab/node.py
src/nepi/resources/planetlab/openvswitch/tunnel.py
src/nepi/resources/planetlab/tap.py
src/nepi/util/execfuncs.py
src/nepi/util/sshfuncs.py

@@@ -531,8 -530,8 +530,8 @@@ class ResourceManager(Logger)
                  import traceback
                  err = traceback.format_exc()
                  self.error(err)
 -            self.set_released()
-                 self.debug("----- RELEASED ---- ")
 +                self.set_released()
  
      def fail(self):
          """ Sets the RM to state FAILED.
          connected = []
          rclass = ResourceFactory.get_resource_type(rtype)
          for guid in self.connections:
 -            rm = self.ec.get_resource(guid)
 +            rm = self.ec.get_resource(guid)
              if not rtype or isinstance(rm, rclass):
                  connected.append(rm)
          return connected
          self.set_ready()
  
      def do_release(self):
 -        pass
 +        self.set_released()
-         self.debug("----- RELEASED ---- ")
  
      def do_fail(self):
          self.set_failed()
      def set_started(self):
          """ Mark ResourceManager as STARTED """
          self.set_state(ResourceState.STARTED, "_start_time")
-         
+         self.debug("----- STARTED ---- ")
 -        
++
      def set_stopped(self):
          """ Mark ResourceManager as STOPPED """
          self.set_state(ResourceState.STOPPED, "_stop_time")
@@@ -90,18 -90,18 +90,18 @@@ class LinuxApplication(ResourceManager)
          command = Attribute("command", "Command to execute at application start. "
                  "Note that commands will be executed in the ${RUN_HOME} directory, "
                  "make sure to take this into account when using relative paths. ", 
-                 flags = Flags.ExecReadOnly)
+                 flags = Flags.Design)
          forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
-                 flags = Flags.ExecReadOnly)
+                 flags = Flags.Design)
          env = Attribute("env", "Environment variables string for command execution",
-                 flags = Flags.ExecReadOnly)
+                 flags = Flags.Design)
          sudo = Attribute("sudo", "Run with root privileges", 
-                 flags = Flags.ExecReadOnly)
+                 flags = Flags.Design)
          depends = Attribute("depends", 
                  "Space-separated list of packages required to run the application",
-                 flags = Flags.ExecReadOnly)
+                 flags = Flags.Design)
          sources = Attribute("sources", 
 -                "Space-separated list of regular files to be uploaded to ${SRC} "
 +                "semi-colon separated list of regular files to be uploaded to ${SRC} "
                  "directory prior to building. Archives won't be expanded automatically. "
                  "Sources are globally available for all experiments unless "
                  "cleanHome is set to True (This will delete all sources). ",
Simple merge
@@@ -186,14 -186,8 +186,14 @@@ class LinuxNode(ResourceManager)
          
          tear_down = Attribute("tearDown", "Bash script to be executed before " + \
                  "releasing the resource",
-                 flags = Flags.ExecReadOnly)
+                 flags = Flags.Design)
  
-                 flags = Flags.ExecReadOnly)
 +        gateway_user = Attribute("gatewayUser", "Gateway account username",
-                 flags = Flags.ExecReadOnly)
++                flags = Flags.Design)
 +
 +        gateway = Attribute("gateway", "Hostname of the gateway machine",
++                flags = Flags.Design)
 +
          cls._register_attribute(hostname)
          cls._register_attribute(username)
          cls._register_attribute(port)
                          host = self.get("hostname"),
                          user = self.get("username"),
                          port = self.get("port"),
 +                        gwuser = self.get("gatewayUser"),
 +                        gw = self.get("gateway"),
                          agent = True,
                          sudo = sudo,
-                         stdin = stdin,
                          identity = self.get("identity"),
                          server_key = self.get("serverKey"),
                          env = env,
                      host = self.get("hostname"),
                      user = self.get("username"),
                      port = self.get("port"),
 +                    gwuser = self.get("gatewayUser"),
 +                    gw = self.get("gateway"),
                      agent = True,
                      sudo = sudo,
-                     stdin = stdin,
                      identity = self.get("identity"),
                      server_key = self.get("serverKey"),
                      env = env,
Simple merge
@@@ -84,9 -78,8 +84,9 @@@ class OVSTunnel(LinuxApplication)
                  "Specifies the interface's emulated bandwidth in bytes "
                  "per second.",
                  type = Types.Integer, 
-                 flags = Flags.ExecReadOnly)
+                 flags = Flags.Design)
  
 +        cls._register_attribute(network)
          cls._register_attribute(cipher)
          cls._register_attribute(cipher_key)
          cls._register_attribute(txqueuelen)
Simple merge
@@@ -60,22 -58,15 +58,22 @@@ def lcopy(source, dest, recursive = Fal
      command = ["cp"]
      if recursive:
          command.append("-R")
 -    
 -    command.append(src)
 -    command.append(dst)
 -    
 +  
 +    if isinstance(dest, str):
 +        dest = dest.split(";")
 +
 +    if isinstance(src, str):
 +        src = src.split(";")
 +           
 +    args.extend(src)
 +
 +    args.extend(dest)
 +
-     p = subprocess.Popen(command, 
+     proc = subprocess.Popen(command, 
          stdout=subprocess.PIPE, 
          stderr=subprocess.PIPE)
  
--    out, err = p.communicate()
++    out, err = proc.communicate()
      return ((out, err), proc)
     
  def lspawn(command, pidfile, 
@@@ -205,12 -207,9 +207,11 @@@ def eintr_retry(func)
      return rv
  
  def rexec(command, host, user, 
 -        port = None, 
 +        port = None,
 +        gwuser = None,
 +        gw = None, 
          agent = True,
          sudo = False,
-         stdin = None,
          identity = None,
          server_key = None,
          env = None,
          command = "sudo " + command
  
      args.append(command)
-     for x in xrange(retry):
-         # connects to the remote host and starts a remote connection
-         proc = subprocess.Popen(args,
-                 env = env,
-                 stdout = subprocess.PIPE,
-                 stdin = subprocess.PIPE, 
-                 stderr = subprocess.PIPE)
-        
-         # attach tempfile object to the process, to make sure the file stays
-         # alive until the process is finished with it
-         proc._known_hosts = tmp_known_hosts
      
-         # by default, rexec calls _communicate which will block 
-         # until the process has exit. The argument block == False 
-         # forces to rexec to return immediately, without blocking 
-         try:
-             if blocking:
-                 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
-             else:
-                 err = proc.stderr.read()
-                 out = proc.stdout.read()
-             msg = " rexec - host %s - command %s " % (host, " ".join(args))
-             log(msg, logging.DEBUG, out, err)
-             if proc.poll():
-                 skip = False
-                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
-                     # SSH error, can safely retry
-                     skip = True 
-                 elif retry:
-                     # Probably timed out or plain failed but can retry
-                     skip = True 
-                 
-                 if skip:
-                     t = x*2
-                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
-                             t, x, host, " ".join(args))
-                     log(msg, logging.DEBUG)
+     log_msg = " rexec - host %s - command %s " % (host, " ".join(args))
  
-                     time.sleep(t)
-                     continue
-             break
-         except RuntimeError, e:
-             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
-             log(msg, logging.DEBUG, out, err)
+     stdout = stderr = stdin = subprocess.PIPE
+     if forward_x11:
+         stdout = stderr = stdin = None
  
-             if retry <= 0:
-                 raise
-             retry -= 1
-         
-     return ((out, err), proc)
+     return _retry_rexec(args, log_msg, 
+             stderr = stderr,
+             stdin = stdin,
+             stdout = stdout,
+             env = env, 
+             retry = retry, 
+             tmp_known_hosts = tmp_known_hosts,
+             blocking = blocking)
  
  def rcopy(source, dest,
 -        port = None, 
 +        port = None,
 +        gwuser = None,
 +        gw = None,
          agent = True, 
          recursive = False,
          identity = None,
      if not strict_host_checking:
          # Do not check for Host key. Unsafe.
          args.extend(['-o', 'StrictHostKeyChecking=no'])
-     if openssh_has_persist():
-         args.extend([
-             '-o', 'ControlMaster=auto',
-             '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
-             ])
-     if isinstance(dest, str):
-         dest = map(str.strip, dest.split(";"))
-     if isinstance(source, str):
-         source = map(str.strip, source.split(";"))
-     args.extend(source)
-     args.extend(dest)
--
-     for x in xrange(retry):
-         # connects to the remote host and starts a remote connection
-         proc = subprocess.Popen(args,
-                 stdout = subprocess.PIPE,
-                 stdin = subprocess.PIPE, 
-                 stderr = subprocess.PIPE)
-         
-         # attach tempfile object to the process, to make sure the file stays
-         # alive until the process is finished with it
-         proc._known_hosts = tmp_known_hosts
 +    
-         try:
-             (out, err) = proc.communicate()
-             eintr_retry(proc.wait)()
-             msg = " rcopy - host %s - command %s " % (host, " ".join(args))
-             log(msg, logging.DEBUG, out, err)
-             if proc.poll():
-                 t = x*2
-                 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
-                         t, x, host, " ".join(args))
-                 log(msg, logging.DEBUG)
-                 time.sleep(t)
-                 continue
+     if isinstance(source, list):
+         args.extend(source)
+     else:
+         if openssh_has_persist():
+             args.extend([
+                 '-o', 'ControlMaster=auto',
+                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
+                 ])
+         args.append(source)
  
-             break
-         except RuntimeError, e:
-             msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
-             log(msg, logging.DEBUG, out, err)
+     args.append(dest)
  
-             if retry <= 0:
-                 raise
-             retry -= 1
-         
-     return ((out, err), proc)
+     log_msg = " rcopy - host %s - command %s " % (host, " ".join(args))
+     
+     return _retry_rexec(args, log_msg, env = None, retry = retry, 
+             tmp_known_hosts = tmp_known_hosts,
+             blocking = True)
  
  def rspawn(command, pidfile, 
          stdout = '/dev/null', 
  
      return (out, err), proc
  
- # POSIX
- def _communicate(proc, input, timeout=None, err_on_timeout=True):
-     read_set = []
-     write_set = []
-     stdout = None # Return
-     stderr = None # Return
-     
-     killed = False
-     
-     if timeout is not None:
-         timelimit = time.time() + timeout
-         killtime = timelimit + 4
-         bailtime = timelimit + 4
-     if proc.stdin:
-         # Flush stdio buffer.  This might block, if the user has
-         # been writing to .stdin in an uncontrolled fashion.
-         proc.stdin.flush()
-         if input:
-             write_set.append(proc.stdin)
-         else:
-             proc.stdin.close()
-     if proc.stdout:
-         read_set.append(proc.stdout)
-         stdout = []
-     if proc.stderr:
-         read_set.append(proc.stderr)
-         stderr = []
-     input_offset = 0
-     while read_set or write_set:
-         if timeout is not None:
-             curtime = time.time()
-             if timeout is None or curtime > timelimit:
-                 if curtime > bailtime:
-                     break
-                 elif curtime > killtime:
-                     signum = signal.SIGKILL
-                 else:
-                     signum = signal.SIGTERM
-                 # Lets kill it
-                 os.kill(proc.pid, signum)
-                 select_timeout = 0.5
-             else:
-                 select_timeout = timelimit - curtime + 0.1
-         else:
-             select_timeout = 1.0
+ def _retry_rexec(args,
+         log_msg,
+         stdout = subprocess.PIPE,
+         stdin = subprocess.PIPE, 
+         stderr = subprocess.PIPE,
+         env = None,
+         retry = 3,
+         tmp_known_hosts = None,
+         blocking = True):
+     for x in xrange(retry):
+         # connects to the remote host and starts a remote connection
+         proc = subprocess.Popen(args,
+                 env = env,
+                 stdout = stdout,
+                 stdin = stdin, 
+                 stderr = stderr)
          
-         if select_timeout > 1.0:
-             select_timeout = 1.0
-             
+         # attach tempfile object to the process, to make sure the file stays
+         # alive until the process is finished with it
+         proc._known_hosts = tmp_known_hosts
+     
+         # The argument block == False forces to rexec to return immediately, 
+         # without blocking 
          try:
-             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
-         except select.error,e:
-             if e[0] != 4:
-                 raise
-             else:
-                 continue
-         
-         if not rlist and not wlist and not xlist and proc.poll() is not None:
-             # timeout and process exited, say bye
+             err = out = " "
+             if blocking:
+                 (out, err) = proc.communicate()
+             elif stdout:
+                 out = proc.stdout.read()
+                 if proc.poll() and stderr:
+                     err = proc.stderr.read()
+             log(log_msg, logging.DEBUG, out, err)
+             if proc.poll():
+                 skip = False
+                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
+                     # SSH error, can safely retry
+                     skip = True 
+                 elif retry:
+                     # Probably timed out or plain failed but can retry
+                     skip = True 
+                 
+                 if skip:
+                     t = x*2
+                     msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
+                             t, x, " ".join(args))
+                     log(msg, logging.DEBUG)
+                     time.sleep(t)
+                     continue
              break
+         except RuntimeError, e:
+             msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
+             log(msg, logging.DEBUG, out, err)
  
-         if proc.stdin in wlist:
-             # When select has indicated that the file is writable,
-             # we can write up to PIPE_BUF bytes without risk
-             # blocking.  POSIX defines PIPE_BUF >= 512
-             bytes_written = os.write(proc.stdin.fileno(),
-                     buffer(input, input_offset, 512))
-             input_offset += bytes_written
-             if input_offset >= len(input):
-                 proc.stdin.close()
-                 write_set.remove(proc.stdin)
-         if proc.stdout in rlist:
-             data = os.read(proc.stdout.fileno(), 1024)
-             if data == "":
-                 proc.stdout.close()
-                 read_set.remove(proc.stdout)
-             stdout.append(data)
-         if proc.stderr in rlist:
-             data = os.read(proc.stderr.fileno(), 1024)
-             if data == "":
-                 proc.stderr.close()
-                 read_set.remove(proc.stderr)
-             stderr.append(data)
-     
-     # All data exchanged.  Translate lists into strings.
-     if stdout is not None:
-         stdout = ''.join(stdout)
-     if stderr is not None:
-         stderr = ''.join(stderr)
-     # Translate newlines, if requested.  We cannot let the file
-     # object do the translation: It is based on stdio, which is
-     # impossible to combine with select (unless forcing no
-     # buffering).
-     if proc.universal_newlines and hasattr(file, 'newlines'):
-         if stdout:
-             stdout = proc._translate_newlines(stdout)
-         if stderr:
-             stderr = proc._translate_newlines(stderr)
-     if killed and err_on_timeout:
-         errcode = proc.poll()
-         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
-     else:
-         if killed:
-             proc.poll()
-         else:
-             proc.wait()
-         return (stdout, stderr)
+             if retry <= 0:
+                 raise
+             retry -= 1
+         
+     return ((out, err), proc)
  
 -