port = None,
user = self.slicename,
agent = None,
- ident_key = self.ident_path
+ ident_key = self.ident_path,
+ server_key = self.node.server_key
)
if proc.wait():
port = None,
user = self.slicename,
agent = None,
- ident_key = self.ident_path
+ ident_key = self.ident_path,
+ server_key = self.node.server_key
)
if pidtuple:
port = None,
user = self.slicename,
agent = None,
- ident_key = self.ident_path
+ ident_key = self.ident_path,
+ server_key = self.node.server_key
)
def remote_trace_path(self, whichtrace):
local_path,
port = None,
agent = None,
- ident_key = self.ident_path
+ ident_key = self.ident_path,
+ server_key = self.node.server_key
)
if proc.wait():
port = None,
user = self.slicename,
agent = None,
- ident_key = self.ident_path
+ ident_key = self.ident_path,
+ server_key = self.node.server_key
)
if proc.wait():
os.path.join(self.home_path, 'stdin') ),
port = None,
agent = None,
- ident_key = self.ident_path
+ ident_key = self.ident_path,
+ server_key = self.node.server_key
)
if proc.wait():
source,
"%s@%s:%s" % (self.slicename, self.node.hostname,
os.path.join(self.home_path,'.'),),
- ident_key = self.ident_path
+ ident_key = self.ident_path,
+ server_key = self.node.server_key
)
if proc.wait():
port = None,
user = self.slicename,
agent = None,
- ident_key = self.ident_path
+ ident_key = self.ident_path,
+ server_key = self.node.server_key
)
if proc.wait():
port = None,
user = self.slicename,
agent = None,
- ident_key = self.ident_path
+ ident_key = self.ident_path,
+ server_key = self.node.server_key
)
if proc.wait():
from constants import TESTBED_ID
from nepi.core import testbed_impl
import os
+import time
class TestbedController(testbed_impl.TestbedController):
def __init__(self, testbed_version):
self.sliceSSHKey = self._attributes.\
get_attribute_value("sliceSSHKey")
- def do_create(self):
- # Create node elements per XML data
- super(TestbedController, self).do_create()
-
+ def do_preconfigure(self):
# Perform resource discovery if we don't have
# specific resources assigned yet
self.do_resource_discovery()
# Create PlanetLab slivers
self.do_provisioning()
- # Wait for all nodes to be ready
- self.wait_nodes()
+ # Configure elements per XML data
+ super(TestbedController, self).do_preconfigure()
def do_resource_discovery(self):
# Do what?
- pass
+
+ # Provisional algo:
+ # look for perfectly defined nodes
+ # (ie: those with only one candidate)
+ to_provision = self._to_provision = set()
+ for guid, node in self._elements.iteritems():
+ if isinstance(node, self._node.Node) and node._node_id is None:
+ # Try existing nodes first
+ # If we have only one candidate, simply use it
+ candidates = node.find_candidates(
+ filter_slice_id = self.slice_id)
+ if len(candidates) == 1:
+ node.assign_node_id(iter(candidates).next())
+ else:
+ # Try again including unassigned nodes
+ candidates = node.find_candidates()
+ if len(candidates) > 1:
+ raise RuntimeError, "Cannot assign resources for node %s, too many candidates" % (guid,)
+ if len(candidates) == 1:
+ node_id = iter(candidates).next()
+ node.assign_node_id(node_id)
+ to_provision.add(node_id)
+ elif not candidates:
+ raise RuntimeError, "Cannot assign resources for node %s, no candidates" % (guid,)
def do_provisioning(self):
- # Que te recontra?
- pass
+ if self._to_provision:
+ # Add new nodes to the slice
+ cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
+ new_nodes = list(set(cur_nodes) | self._to_provision)
+ self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
+
+ # cleanup
+ del self._to_provision
- def wait_nodes(self):
- # Suuure...
- pass
def set(self, time, guid, name, value):
super(TestbedController, self).set(time, guid, name, value)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+import time
+
from constants import TESTBED_ID
from nepi.core import metadata
from nepi.core.attributes import Attribute
node.ident_path = testbed_instance.sliceSSHKey
node.slicename = testbed_instance.slicename
- # If we have only one candidate, simply use it
- candidates = node.find_candidates(
- filter_slice_id = testbed_instance.slice_id)
- if len(candidates) == 1:
- node.assign_node_id(iter(candidates).next())
-
# Do some validations
node.validate()
- # TODO: this should be done in parallel in all nodes
+ # recently provisioned nodes may not be up yet
+ sleeptime = 1.0
+ while not node.is_alive():
+ time.sleep(sleeptime)
+ sleeptime = min(30.0, sleeptime*1.5)
+
+ # this will be done in parallel in all nodes
+ # this call only spawns the process
node.install_dependencies()
def configure_application(testbed_instance, guid):
import time
import os
+from nepi.util import server
+
class Node(object):
BASEFILTERS = {
# Map Node attribute to plcapi filter name
# Testbed-derived attributes
self.slicename = None
self.ident_path = None
+ self.server_key = None
self.home_path = None
# Those are filled when an actual node is allocated
self.fetch_node_info()
def fetch_node_info(self):
- info = self._api.GetNodes(self._node_id)
+ info = self._api.GetNodes(self._node_id)[0]
tags = dict( (t['tagname'],t['value'])
for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
if 'interface_ids' in info:
self.min_num_external_ifaces = \
self.max_num_external_ifaces = len(info['interface_ids'])
+
+ if 'ssh_rsa_key' in info:
+ self.server_key = info['ssh_rsa_key']
def validate(self):
if self.home_path is None:
user = self.slicename,
agent = None,
ident_key = self.ident_path,
+ server_key = self.server_key,
sudo = True
)
port = None,
user = self.slicename,
agent = None,
- ident_key = self.ident_path
+ ident_key = self.ident_path,
+ server_key = self.server_key
)
if pidtuple:
pid, ppid = pidtuple
port = None,
user = self.slicename,
agent = None,
- ident_key = self.ident_path
+ ident_key = self.ident_path,
+ server_key = self.server_key
):
time.sleep(probe)
-
+ def is_alive(self):
+ # Make sure all the paths are created where
+ # they have to be created for deployment
+ (out,err),proc = server.popen_ssh_command(
+ "echo 'ALIVE'",
+ host = self.hostname,
+ port = None,
+ user = self.slicename,
+ agent = None,
+ ident_key = self.ident_path,
+ server_key = self.server_key
+ )
+
+ if proc.wait():
+ return False
+ elif not err and out.strip() == 'ALIVE':
+ return True
+ else:
+ return False
+
(peer['peername'], peer['peer_id'])
for peer in peers
)
- self._peer_map = dict(
+ self._peer_map.update(
(peer['peer_id'], peer['shortname'])
for peer in peers
)
+ self._peer_map[None] = self._localPeerName
return self._peer_map
filters.update(kw)
return self.api.GetSlices(self.auth, filters, *fieldstuple)
+ def UpdateSlice(self, sliceIdOrName, **kw):
+ return self.api.UpdateSlice(self.auth, sliceIdOrName, kw)
+
"""
def remote_spawn(command, pidfile, stdout='/dev/null', stderr=STDOUT, stdin='/dev/null', home=None, create_home=False, sudo=False,
- host = None, port = None, user = None, agent = None, ident_key = None, tty = False):
+ host = None, port = None, user = None, agent = None,
+ ident_key = None, server_key = None,
+ tty = False):
"""
Spawn a remote command such that it will continue working asynchronously.
user = user,
agent = agent,
ident_key = ident_key,
+ server_key = server_key,
tty = tty
)
return (out,err),proc
def remote_check_pid(pidfile,
- host = None, port = None, user = None, agent = None, ident_key = None):
+ host = None, port = None, user = None, agent = None,
+ ident_key = None, server_key = None):
"""
Check the pidfile of a process spawned with remote_spawn.
port = port,
user = user,
agent = agent,
- ident_key = ident_key
+ ident_key = ident_key,
+ server_key = server_key
)
if proc.wait():
def remote_status(pid, ppid,
- host = None, port = None, user = None, agent = None, ident_key = None):
+ host = None, port = None, user = None, agent = None,
+ ident_key = None, server_key = None):
"""
Check the status of a process spawned with remote_spawn.
port = port,
user = user,
agent = agent,
- ident_key = ident_key
+ ident_key = ident_key,
+ server_key = server_key
)
if proc.wait():
def remote_kill(pid, ppid, sudo = False,
- host = None, port = None, user = None, agent = None, ident_key = None):
+ host = None, port = None, user = None, agent = None,
+ ident_key = None, server_key = None):
"""
Kill a process spawned with remote_spawn.
port = port,
user = user,
agent = agent,
- ident_key = ident_key
+ ident_key = ident_key,
+ server_key = server_key
)
# wait, don't leave zombies around
import traceback
import signal
import re
+import tempfile
CTRL_SOCK = "ctrl.sock"
STD_ERR = "stderr.log"
encoded = data.rstrip()
return base64.b64decode(encoded)
+def _make_server_key_args(server_key, host, port, args):
+ """
+ Returns a reference to the created temporary file, and adds the
+ corresponding arguments to the given argument list.
+
+ Make sure to hold onto it until the process is done with the file
+ """
+ if port is not None:
+ host = '%s:%s' % (host,port)
+ # Create a temporary server key file
+ tmp_known_hosts = tempfile.NamedTemporaryFile()
+ tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
+ tmp_known_hosts.flush()
+ args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
+ return tmp_known_hosts
+
def popen_ssh_command(command, host, port, user, agent,
stdin="",
ident_key = None,
+ server_key = None,
tty = False):
"""
Executes a remote commands, returns ((stdout,stderr),process)
"""
+ tmp_known_hosts = None
args = ['ssh',
# Don't bother with localhost. Makes test easier
'-o', 'NoHostAuthenticationForLocalhost=yes',
args.extend(('-i', ident_key))
if tty:
args.append('-t')
+ if server_key:
+ # Create a temporary server key file
+ tmp_known_hosts = _make_server_key_args(
+ server_key, host, port, args)
args.append(command)
# connects to the remote host and starts a remote connection
stdout = subprocess.PIPE,
stdin = subprocess.PIPE,
stderr = subprocess.PIPE)
+
+ # attach tempfile object to the process, to make sure the file stays
+ # alive until the process is finished with it
+ proc._known_hosts = tmp_known_hosts
+
return (proc.communicate(stdin), proc)
def popen_scp(source, dest,
port = None,
agent = None,
recursive = False,
- ident_key = None):
+ ident_key = None,
+ server_key = None):
"""
Copies from/to remote sites.
or hasattr(source, 'read') or hasattr(dest, 'write'):
assert not resursive
- # Parse destination as <user>@<server>:<path>
- tgtspec, path = dest.split(':',1)
- user,host = tgtspec.rsplit('@',1)
+ # Parse source/destination as <user>@<server>:<path>
+ if isinstance(dest, basestring) and ':' in dest:
+ remspec, path = dest.split(':',1)
+ elif isinstance(source, basestring) and ':' in source:
+ remspec, path = source.split(':',1)
+ else:
+ raise ValueError, "Both endpoints cannot be local"
+ user,host = remspec.rsplit('@',1)
+ tmp_known_hosts = None
args = ['ssh', '-l', user, '-C',
# Don't bother with localhost. Makes test easier
args.append('-P%d' % port)
if ident_key:
args.extend(('-i', ident_key))
+ if server_key:
+ # Create a temporary server key file
+ tmp_known_hosts = _make_server_key_args(
+ server_key, host, port, args)
if isinstance(source, file) or hasattr(source, 'read'):
args.append('cat > %s' % (shell_escape(path),))
stderr = subprocess.PIPE,
stdin = source)
err = proc.stderr.read()
+ proc._known_hosts = tmp_known_hosts
proc.wait()
return ((None,err), proc)
elif isinstance(dest, file):
stderr = subprocess.PIPE,
stdin = source)
err = proc.stderr.read()
+ proc._known_hosts = tmp_known_hosts
proc.wait()
return ((None,err), proc)
elif hasattr(source, 'read'):
break
err.append(proc.stderr.read())
+ proc._known_hosts = tmp_known_hosts
proc.wait()
return ((None,''.join(err)), proc)
elif hasattr(dest, 'write'):
break
err.append(proc.stderr.read())
+ proc._known_hosts = tmp_known_hosts
proc.wait()
return ((None,''.join(err)), proc)
else:
raise AssertionError, "Unreachable code reached! :-Q"
else:
+ # Parse destination as <user>@<server>:<path>
+ if isinstance(dest, basestring) and ':' in dest:
+ remspec, path = dest.split(':',1)
+ elif isinstance(source, basestring) and ':' in source:
+ remspec, path = source.split(':',1)
+ else:
+ raise ValueError, "Both endpoints cannot be local"
+ user,host = remspec.rsplit('@',1)
+
# plain scp
+ tmp_known_hosts = None
args = ['scp', '-q', '-p', '-C',
# Don't bother with localhost. Makes test easier
'-o', 'NoHostAuthenticationForLocalhost=yes' ]
args.append('-r')
if ident_key:
args.extend(('-i', ident_key))
+ if server_key:
+ # Create a temporary server key file
+ tmp_known_hosts = _make_server_key_args(
+ server_key, host, port, args)
args.append(source)
args.append(dest)
stdout = subprocess.PIPE,
stdin = subprocess.PIPE,
stderr = subprocess.PIPE)
+ proc._known_hosts = tmp_known_hosts
+
comm = proc.communicate()
proc.wait()
return (comm, proc)
def popen_ssh_subprocess(python_code, host, port, user, agent,
python_path = None,
ident_key = None,
+ server_key = None,
tty = False):
if python_path:
python_path.replace("'", r"'\''")
cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
cmd += "exec(cmd)\n'"
+ tmp_known_hosts = None
args = ['ssh',
# Don't bother with localhost. Makes test easier
'-o', 'NoHostAuthenticationForLocalhost=yes',
args.extend(('-i', ident_key))
if tty:
args.append('-t')
+ if server_key:
+ # Create a temporary server key file
+ tmp_known_hosts = _make_server_key_args(
+ server_key, host, port, args)
args.append(cmd)
# connects to the remote host and starts a remote rpyc connection
stdout = subprocess.PIPE,
stdin = subprocess.PIPE,
stderr = subprocess.PIPE)
+ proc._known_hosts = tmp_known_hosts
+
# send the command to execute
os.write(proc.stdin.fileno(),
base64.b64encode(python_code) + "\n")