From: Alina Quereilhac Date: Mon, 17 Feb 2014 18:53:45 +0000 (+0100) Subject: Merging ns-3 into nepi-3-dev X-Git-Tag: nepi-3.1.0~120 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=2efd5eabeba8a6577ace9132d6336d44be0510e8;p=nepi.git Merging ns-3 into nepi-3-dev --- 2efd5eabeba8a6577ace9132d6336d44be0510e8 diff --cc src/nepi/execution/resource.py index 49b842be,54af5e36..5ad084da --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@@ -531,8 -530,8 +530,8 @@@ class ResourceManager(Logger) import traceback err = traceback.format_exc() self.error(err) + - self.set_released() + self.set_released() - self.debug("----- RELEASED ---- ") def fail(self): """ Sets the RM to state FAILED. @@@ -676,7 -696,8 +696,8 @@@ connected = [] rclass = ResourceFactory.get_resource_type(rtype) for guid in self.connections: - rm = self.ec.get_resource(guid) + + rm = self.ec.get_resource(guid) if not rtype or isinstance(rm, rclass): connected.append(rm) return connected @@@ -945,8 -966,7 +966,7 @@@ self.set_ready() def do_release(self): - pass + self.set_released() - self.debug("----- RELEASED ---- ") def do_fail(self): self.set_failed() @@@ -954,7 -974,8 +974,8 @@@ def set_started(self): """ Mark ResourceManager as STARTED """ self.set_state(ResourceState.STARTED, "_start_time") - + self.debug("----- STARTED ---- ") - ++ def set_stopped(self): """ Mark ResourceManager as STOPPED """ self.set_state(ResourceState.STOPPED, "_stop_time") diff --cc src/nepi/resources/linux/application.py index 6f6efa93,a1a9f972..d391086c --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@@ -90,18 -90,18 +90,18 @@@ class LinuxApplication(ResourceManager) command = Attribute("command", "Command to execute at application start. " "Note that commands will be executed in the ${RUN_HOME} directory, " "make sure to take this into account when using relative paths. ", - flags = Flags.ExecReadOnly) + flags = Flags.Design) forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", - flags = Flags.ExecReadOnly) + flags = Flags.Design) env = Attribute("env", "Environment variables string for command execution", - flags = Flags.ExecReadOnly) + flags = Flags.Design) sudo = Attribute("sudo", "Run with root privileges", - flags = Flags.ExecReadOnly) + flags = Flags.Design) depends = Attribute("depends", "Space-separated list of packages required to run the application", - flags = Flags.ExecReadOnly) + flags = Flags.Design) sources = Attribute("sources", - "Space-separated list of regular files to be uploaded to ${SRC} " + "semi-colon separated list of regular files to be uploaded to ${SRC} " "directory prior to building. Archives won't be expanded automatically. " "Sources are globally available for all experiments unless " "cleanHome is set to True (This will delete all sources). ", diff --cc src/nepi/resources/linux/node.py index 029ee8ad,ff104aa5..f9ae091c --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@@ -186,14 -186,8 +186,14 @@@ class LinuxNode(ResourceManager) tear_down = Attribute("tearDown", "Bash script to be executed before " + \ "releasing the resource", - flags = Flags.ExecReadOnly) + flags = Flags.Design) + gateway_user = Attribute("gatewayUser", "Gateway account username", - flags = Flags.ExecReadOnly) ++ flags = Flags.Design) + + gateway = Attribute("gateway", "Hostname of the gateway machine", - flags = Flags.ExecReadOnly) ++ flags = Flags.Design) + cls._register_attribute(hostname) cls._register_attribute(username) cls._register_attribute(port) @@@ -483,11 -455,8 +483,10 @@@ host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, sudo = sudo, - stdin = stdin, identity = self.get("identity"), server_key = self.get("serverKey"), env = env, @@@ -507,11 -474,8 +504,10 @@@ host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, sudo = sudo, - stdin = stdin, identity = self.get("identity"), server_key = self.get("serverKey"), env = env, diff --cc src/nepi/resources/planetlab/openvswitch/tunnel.py index 0b28327e,359682cf..f076ec13 --- a/src/nepi/resources/planetlab/openvswitch/tunnel.py +++ b/src/nepi/resources/planetlab/openvswitch/tunnel.py @@@ -84,9 -78,8 +84,9 @@@ class OVSTunnel(LinuxApplication) "Specifies the interface's emulated bandwidth in bytes " "per second.", type = Types.Integer, - flags = Flags.ExecReadOnly) + flags = Flags.Design) + cls._register_attribute(network) cls._register_attribute(cipher) cls._register_attribute(cipher_key) cls._register_attribute(txqueuelen) diff --cc src/nepi/util/execfuncs.py index b25ae980,98b9d739..d2da3ace --- a/src/nepi/util/execfuncs.py +++ b/src/nepi/util/execfuncs.py @@@ -60,22 -58,15 +58,22 @@@ def lcopy(source, dest, recursive = Fal command = ["cp"] if recursive: command.append("-R") - - command.append(src) - command.append(dst) - + + if isinstance(dest, str): + dest = dest.split(";") + + if isinstance(src, str): + src = src.split(";") + + args.extend(src) + + args.extend(dest) + - p = subprocess.Popen(command, + proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) -- out, err = p.communicate() ++ out, err = proc.communicate() return ((out, err), proc) def lspawn(command, pidfile, diff --cc src/nepi/util/sshfuncs.py index 1df46b99,81bf5f9b..b9b5342c --- a/src/nepi/util/sshfuncs.py +++ b/src/nepi/util/sshfuncs.py @@@ -205,12 -207,9 +207,11 @@@ def eintr_retry(func) return rv def rexec(command, host, user, - port = None, + port = None, + gwuser = None, + gw = None, agent = True, sudo = False, - stdin = None, identity = None, server_key = None, env = None, @@@ -285,65 -271,24 +284,26 @@@ command = "sudo " + command args.append(command) - - for x in xrange(retry): - # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, - env = env, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) - - # attach tempfile object to the process, to make sure the file stays - # alive until the process is finished with it - proc._known_hosts = tmp_known_hosts - # by default, rexec calls _communicate which will block - # until the process has exit. The argument block == False - # forces to rexec to return immediately, without blocking - try: - if blocking: - out, err = _communicate(proc, stdin, timeout, err_on_timeout) - else: - err = proc.stderr.read() - out = proc.stdout.read() - - msg = " rexec - host %s - command %s " % (host, " ".join(args)) - log(msg, logging.DEBUG, out, err) - - if proc.poll(): - skip = False - - if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '): - # SSH error, can safely retry - skip = True - elif retry: - # Probably timed out or plain failed but can retry - skip = True - - if skip: - t = x*2 - msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( - t, x, host, " ".join(args)) - log(msg, logging.DEBUG) + log_msg = " rexec - host %s - command %s " % (host, " ".join(args)) - time.sleep(t) - continue - break - except RuntimeError, e: - msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args) - log(msg, logging.DEBUG, out, err) + stdout = stderr = stdin = subprocess.PIPE + if forward_x11: + stdout = stderr = stdin = None - if retry <= 0: - raise - retry -= 1 - - return ((out, err), proc) + return _retry_rexec(args, log_msg, + stderr = stderr, + stdin = stdin, + stdout = stdout, + env = env, + retry = retry, + tmp_known_hosts = tmp_known_hosts, + blocking = blocking) def rcopy(source, dest, - port = None, + port = None, + gwuser = None, + gw = None, agent = True, recursive = False, identity = None, @@@ -410,59 -345,24 +370,24 @@@ if not strict_host_checking: # Do not check for Host key. Unsafe. args.extend(['-o', 'StrictHostKeyChecking=no']) - - if openssh_has_persist(): - args.extend([ - '-o', 'ControlMaster=auto', - '-o', 'ControlPath=%s' % (make_control_path(agent, False),) - ]) - - if isinstance(dest, str): - dest = map(str.strip, dest.split(";")) - - if isinstance(source, str): - source = map(str.strip, source.split(";")) - - args.extend(source) - - args.extend(dest) -- - for x in xrange(retry): - # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) - - # attach tempfile object to the process, to make sure the file stays - # alive until the process is finished with it - proc._known_hosts = tmp_known_hosts + - try: - (out, err) = proc.communicate() - eintr_retry(proc.wait)() - msg = " rcopy - host %s - command %s " % (host, " ".join(args)) - log(msg, logging.DEBUG, out, err) - - if proc.poll(): - t = x*2 - msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( - t, x, host, " ".join(args)) - log(msg, logging.DEBUG) - - time.sleep(t) - continue + if isinstance(source, list): + args.extend(source) + else: + if openssh_has_persist(): + args.extend([ + '-o', 'ControlMaster=auto', + '-o', 'ControlPath=%s' % (make_control_path(agent, False),) + ]) + args.append(source) - break - except RuntimeError, e: - msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args) - log(msg, logging.DEBUG, out, err) + args.append(dest) - if retry <= 0: - raise - retry -= 1 - - return ((out, err), proc) + log_msg = " rcopy - host %s - command %s " % (host, " ".join(args)) + + return _retry_rexec(args, log_msg, env = None, retry = retry, + tmp_known_hosts = tmp_known_hosts, + blocking = True) def rspawn(command, pidfile, stdout = '/dev/null', @@@ -736,120 -620,68 +661,67 @@@ f return (out, err), proc - # POSIX - def _communicate(proc, input, timeout=None, err_on_timeout=True): - read_set = [] - write_set = [] - stdout = None # Return - stderr = None # Return - - killed = False - - if timeout is not None: - timelimit = time.time() + timeout - killtime = timelimit + 4 - bailtime = timelimit + 4 - - if proc.stdin: - # Flush stdio buffer. This might block, if the user has - # been writing to .stdin in an uncontrolled fashion. - proc.stdin.flush() - if input: - write_set.append(proc.stdin) - else: - proc.stdin.close() - - if proc.stdout: - read_set.append(proc.stdout) - stdout = [] - - if proc.stderr: - read_set.append(proc.stderr) - stderr = [] - - input_offset = 0 - while read_set or write_set: - if timeout is not None: - curtime = time.time() - if timeout is None or curtime > timelimit: - if curtime > bailtime: - break - elif curtime > killtime: - signum = signal.SIGKILL - else: - signum = signal.SIGTERM - # Lets kill it - os.kill(proc.pid, signum) - select_timeout = 0.5 - else: - select_timeout = timelimit - curtime + 0.1 - else: - select_timeout = 1.0 + def _retry_rexec(args, + log_msg, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE, + env = None, + retry = 3, + tmp_known_hosts = None, + blocking = True): + + for x in xrange(retry): + # connects to the remote host and starts a remote connection + proc = subprocess.Popen(args, + env = env, + stdout = stdout, + stdin = stdin, + stderr = stderr) - if select_timeout > 1.0: - select_timeout = 1.0 - + # attach tempfile object to the process, to make sure the file stays + # alive until the process is finished with it + proc._known_hosts = tmp_known_hosts + + # The argument block == False forces to rexec to return immediately, + # without blocking try: - rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout) - except select.error,e: - if e[0] != 4: - raise - else: - continue - - if not rlist and not wlist and not xlist and proc.poll() is not None: - # timeout and process exited, say bye + err = out = " " + if blocking: + (out, err) = proc.communicate() + elif stdout: + out = proc.stdout.read() + if proc.poll() and stderr: + err = proc.stderr.read() + + log(log_msg, logging.DEBUG, out, err) + + if proc.poll(): + skip = False + + if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '): + # SSH error, can safely retry + skip = True + elif retry: + # Probably timed out or plain failed but can retry + skip = True + + if skip: + t = x*2 + msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( + t, x, " ".join(args)) + log(msg, logging.DEBUG) + + time.sleep(t) + continue break + except RuntimeError, e: + msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg ) + log(msg, logging.DEBUG, out, err) - if proc.stdin in wlist: - # When select has indicated that the file is writable, - # we can write up to PIPE_BUF bytes without risk - # blocking. POSIX defines PIPE_BUF >= 512 - bytes_written = os.write(proc.stdin.fileno(), - buffer(input, input_offset, 512)) - input_offset += bytes_written - - if input_offset >= len(input): - proc.stdin.close() - write_set.remove(proc.stdin) - - if proc.stdout in rlist: - data = os.read(proc.stdout.fileno(), 1024) - if data == "": - proc.stdout.close() - read_set.remove(proc.stdout) - stdout.append(data) - - if proc.stderr in rlist: - data = os.read(proc.stderr.fileno(), 1024) - if data == "": - proc.stderr.close() - read_set.remove(proc.stderr) - stderr.append(data) - - # All data exchanged. Translate lists into strings. - if stdout is not None: - stdout = ''.join(stdout) - if stderr is not None: - stderr = ''.join(stderr) - - # Translate newlines, if requested. We cannot let the file - # object do the translation: It is based on stdio, which is - # impossible to combine with select (unless forcing no - # buffering). - if proc.universal_newlines and hasattr(file, 'newlines'): - if stdout: - stdout = proc._translate_newlines(stdout) - if stderr: - stderr = proc._translate_newlines(stderr) - - if killed and err_on_timeout: - errcode = proc.poll() - raise RuntimeError, ("Operation timed out", errcode, stdout, stderr) - else: - if killed: - proc.poll() - else: - proc.wait() - return (stdout, stderr) + if retry <= 0: + raise + retry -= 1 + + return ((out, err), proc) -