from constants import TESTBED_ID
from nepi.core import testbed_impl
+from nepi.util.constants import TIME_NOW
+from nepi.util.graphtools import mst
+from nepi.util import ipaddr2
import os
+import os.path
import time
+import resourcealloc
+import collections
+import operator
+import functools
+import socket
+import struct
+import tempfile
+import subprocess
+import random
+import shutil
+
+from nepi.util.constants import TESTBED_STATUS_CONFIGURED
+
+class TempKeyError(Exception):
+ pass
class TestbedController(testbed_impl.TestbedController):
def __init__(self, testbed_version):
self._home_directory = None
self.slicename = None
self._traces = dict()
-
+
import node, interfaces, application
self._node = node
self._interfaces = interfaces
@property
def home_directory(self):
return self._home_directory
-
+
@property
def plapi(self):
if not hasattr(self, '_plapi'):
import plcapi
-
+
if self.authUser:
self._plapi = plcapi.PLCAPI(
username = self.authUser,
- password = self.authString)
+ password = self.authString,
+ hostname = self.plcHost,
+ urlpattern = self.plcUrl
+ )
else:
# anonymous access - may not be enough for much
self._plapi = plcapi.PLCAPI()
return self._plapi
-
+
@property
def slice_id(self):
if not hasattr(self, '_slice_id'):
get_attribute_value("authPass")
self.sliceSSHKey = self._attributes.\
get_attribute_value("sliceSSHKey")
+ self.sliceSSHKeyPass = None
+ self.plcHost = self._attributes.\
+ get_attribute_value("plcHost")
+ self.plcUrl = self._attributes.\
+ get_attribute_value("plcUrl")
+ super(TestbedController, self).do_setup()
+
+ def do_post_asynclaunch(self, guid):
+ # Dependencies were launched asynchronously,
+ # so wait for them
+ dep = self._elements[guid]
+ if isinstance(dep, self._app.Dependency):
+ dep.async_setup_wait()
+
+ # Two-phase configuration for asynchronous launch
+ do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
+ do_poststep_configure = staticmethod(do_post_asynclaunch)
def do_preconfigure(self):
# Perform resource discovery if we don't have
# specific resources assigned yet
self.do_resource_discovery()
-
+
# Create PlanetLab slivers
self.do_provisioning()
+ # Plan application deployment
+ self.do_spanning_deployment_plan()
+
# Configure elements per XML data
super(TestbedController, self).do_preconfigure()
-
+
def do_resource_discovery(self):
- # Do what?
+ to_provision = self._to_provision = set()
- # Provisional algo:
+ # Initial algo:
# look for perfectly defined nodes
# (ie: those with only one candidate)
- to_provision = self._to_provision = set()
for guid, node in self._elements.iteritems():
if isinstance(node, self._node.Node) and node._node_id is None:
# Try existing nodes first
# Try again including unassigned nodes
candidates = node.find_candidates()
if len(candidates) > 1:
- raise RuntimeError, "Cannot assign resources for node %s, too many candidates" % (guid,)
+ continue
if len(candidates) == 1:
node_id = iter(candidates).next()
node.assign_node_id(node_id)
to_provision.add(node_id)
elif not candidates:
- raise RuntimeError, "Cannot assign resources for node %s, no candidates" % (guid,)
+ raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
+ node.make_filter_description())
+
+ # Now do the backtracking search for a suitable solution
+ # First with existing slice nodes
+ reqs = []
+ nodes = []
+ for guid, node in self._elements.iteritems():
+ if isinstance(node, self._node.Node) and node._node_id is None:
+ # Try existing nodes first
+ # If we have only one candidate, simply use it
+ candidates = node.find_candidates(
+ filter_slice_id = self.slice_id)
+ reqs.append(candidates)
+ nodes.append(node)
+
+ if nodes and reqs:
+ try:
+ solution = resourcealloc.alloc(reqs)
+ except resourcealloc.ResourceAllocationError:
+ # Failed, try again with all nodes
+ reqs = []
+ for node in nodes:
+ candidates = node.find_candidates()
+ reqs.append(candidates)
+
+ solution = resourcealloc.alloc(reqs)
+ to_provision.update(solution)
+
+ # Do assign nodes
+ for node, node_id in zip(nodes, solution):
+ node.assign_node_id(node_id)
def do_provisioning(self):
if self._to_provision:
cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
new_nodes = list(set(cur_nodes) | self._to_provision)
self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
-
+
# cleanup
del self._to_provision
+ def do_spanning_deployment_plan(self):
+ # Create application groups by collecting all applications
+ # based on their hash - the hash should contain everything that
+ # defines them and the platform they're built
+
+ def dephash(app):
+ return (
+ frozenset((app.depends or "").split(' ')),
+ frozenset((app.sources or "").split(' ')),
+ app.build,
+ app.install,
+ app.node.architecture,
+ app.node.operatingSystem,
+ app.node.pl_distro,
+ )
+
+ depgroups = collections.defaultdict(list)
+
+ for element in self._elements.itervalues():
+ if isinstance(element, self._app.Dependency):
+ depgroups[dephash(element)].append(element)
+
+ # Set up spanning deployment for those applications that
+ # have been deployed in several nodes.
+ for dh, group in depgroups.iteritems():
+ if len(group) > 1:
+ # Pick root (deterministically)
+ root = min(group, key=lambda app:app.node.hostname)
+
+ # Obtain all IPs in numeric format
+ # (which means faster distance computations)
+ for dep in group:
+ dep._ip = socket.gethostbyname(dep.node.hostname)
+ dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
+
+ # Compute plan
+ # NOTE: the plan is an iterator
+ plan = mst.mst(
+ group,
+ lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
+ root = root,
+ maxbranching = 2)
+
+ # Re-sign private key
+ try:
+ tempprk, temppuk, tmppass = self._make_temp_private_key()
+ except TempKeyError:
+ continue
+
+ # Set up slaves
+ plan = list(plan)
+ for slave, master in plan:
+ slave.set_master(master)
+ slave.install_keys(tempprk, temppuk, tmppass)
+
+ # We don't need the user's passphrase anymore
+ self.sliceSSHKeyPass = None
+
+ def _make_temp_private_key(self):
+ # Get the user's key's passphrase
+ if not self.sliceSSHKeyPass:
+ if 'SSH_ASKPASS' in os.environ:
+ proc = subprocess.Popen(
+ [ os.environ['SSH_ASKPASS'],
+ "Please type the passphrase for the %s SSH identity file. "
+ "The passphrase will be used to re-cipher the identity file with "
+ "a random 256-bit key for automated chain deployment on the "
+ "%s PlanetLab slice" % (
+ os.path.basename(self.sliceSSHKey),
+ self.slicename
+ ) ],
+ stdin = open("/dev/null"),
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+ out,err = proc.communicate()
+ self.sliceSSHKeyPass = out.strip()
+
+ if not self.sliceSSHKeyPass:
+ raise TempKeyError
+
+ # Create temporary key files
+ prk = tempfile.NamedTemporaryFile(
+ dir = self.root_directory,
+ prefix = "pl_deploy_tmpk_",
+ suffix = "")
- def set(self, time, guid, name, value):
- super(TestbedController, self).set(time, guid, name, value)
- # TODO: take on account schedule time for the task
+ puk = tempfile.NamedTemporaryFile(
+ dir = self.root_directory,
+ prefix = "pl_deploy_tmpk_",
+ suffix = ".pub")
+
+ # Create secure 256-bits temporary passphrase
+ passphrase = ''.join(map(chr,[rng.randint(0,255)
+ for rng in (random.SystemRandom(),)
+ for i in xrange(32)] )).encode("hex")
+
+ # Copy keys
+ oprk = open(self.sliceSSHKey, "rb")
+ opuk = open(self.sliceSSHKey+".pub", "rb")
+ shutil.copymode(oprk.name, prk.name)
+ shutil.copymode(opuk.name, puk.name)
+ shutil.copyfileobj(oprk, prk)
+ shutil.copyfileobj(opuk, puk)
+ prk.flush()
+ puk.flush()
+ oprk.close()
+ opuk.close()
+
+ # A descriptive comment
+ comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
+
+ # Recipher keys
+ proc = subprocess.Popen(
+ ["ssh-keygen", "-p",
+ "-f", prk.name,
+ "-P", self.sliceSSHKeyPass,
+ "-N", passphrase,
+ "-C", comment ],
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE,
+ stdin = subprocess.PIPE
+ )
+ out, err = proc.communicate()
+
+ if err:
+ raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
+ out, err)
+
+ prk.seek(0)
+ puk.seek(0)
+
+ # Change comment on public key
+ puklines = puk.readlines()
+ puklines[0] = puklines[0].split(' ')
+ puklines[0][-1] = comment+'\n'
+ puklines[0] = ' '.join(puklines[0])
+ puk.seek(0)
+ puk.truncate()
+ puk.writelines(puklines)
+ del puklines
+ puk.flush()
+
+ return prk, puk, passphrase
+
+ def set(self, guid, name, value, time = TIME_NOW):
+ super(TestbedController, self).set(guid, name, value, time)
+ # TODO: take on account schedule time for the task
element = self._elements[guid]
if element:
setattr(element, name, value)
-
+
if hasattr(element, 'refresh'):
# invoke attribute refresh hook
element.refresh()
- def get(self, time, guid, name):
+ def get(self, guid, name, time = TIME_NOW):
+ value = super(TestbedController, self).get(guid, name, time)
# TODO: take on account schedule time for the task
+ factory_id = self._create[guid]
+ factory = self._factories[factory_id]
+ if factory.box_attributes.is_attribute_design_only(name):
+ return value
element = self._elements.get(guid)
- if element:
- try:
- if hasattr(element, name):
- # Runtime attribute
- return getattr(element, name)
- else:
- # Try design-time attributes
- return self.box_get(time, guid, name)
- except KeyError, AttributeError:
- return None
-
- def get_route(self, guid, index, attribute):
- # TODO: fetch real data from planetlab
try:
- return self.box_get_route(guid, int(index), attribute)
+ return getattr(element, name)
except KeyError, AttributeError:
- return None
+ return value
def get_address(self, guid, index, attribute='Address'):
index = int(index)
-
+
# try the real stuff
iface = self._elements.get(guid)
if iface and index == 0:
return iface.netprefix
elif attribute == 'Broadcast':
return iface.broadcast
-
- # if all else fails, query box
- try:
- return self.box_get_address(guid, index, attribute)
- except KeyError, AttributeError:
- return None
+ # if all else fails, query box
+ return super(TestbedController, self).get_address(guid, index, attribute)
def action(self, time, guid, action):
raise NotImplementedError
def shutdown(self):
- for trace in self._traces.values():
+ for trace in self._traces.itervalues():
trace.close()
- for element in self._elements.values():
+ for element in self._elements.itervalues():
# invoke cleanup hooks
if hasattr(element, 'cleanup'):
element.cleanup()
+ for element in self._elements.itervalues():
+ # invoke destroy hooks
+ if hasattr(element, 'destroy'):
+ element.destroy()
+ self._elements.clear()
+ self._traces.clear()
def trace(self, guid, trace_id, attribute='value'):
app = self._elements[guid]
-
+
if attribute == 'value':
path = app.sync_trace(self.home_directory, trace_id)
if path:
content = None
elif attribute == 'path':
content = app.remote_trace_path(trace_id)
+ elif attribute == 'size':
+ # TODO
+ raise NotImplementedError
else:
content = None
return content
-
+
def follow_trace(self, trace_id, trace):
self._traces[trace_id] = trace
+
+ def _make_generic(self, parameters, kind):
+ app = kind(self.plapi)
- def _make_node(self, parameters):
- node = self._node.Node(self.plapi)
-
# Note: there is 1-to-1 correspondence between attribute names
# If that changes, this has to change as well
for attr,val in parameters.iteritems():
- setattr(node, attr, val)
-
- # If emulation is enabled, we automatically need
+ setattr(app, attr, val)
+
+ return app
+
+ def _make_node(self, parameters):
+ node = self._make_generic(parameters, self._node.Node)
+
+ # If emulation is enabled, we automatically need
# some vsys interfaces and packages
if node.emulation:
node.required_vsys.add('ipfw-be')
node.required_packages.add('ipfwslice')
-
+
return node
-
+
def _make_node_iface(self, parameters):
- iface = self._interfaces.NodeIface(self.plapi)
-
- # Note: there is 1-to-1 correspondence between attribute names
- # If that changes, this has to change as well
- for attr,val in parameters.iteritems():
- setattr(iface, attr, val)
-
- return iface
-
+ return self._make_generic(parameters, self._interfaces.NodeIface)
+
def _make_tun_iface(self, parameters):
- iface = self._interfaces.TunIface(self.plapi)
-
- # Note: there is 1-to-1 correspondence between attribute names
- # If that changes, this has to change as well
- for attr,val in parameters.iteritems():
- setattr(iface, attr, val)
-
- return iface
-
+ return self._make_generic(parameters, self._interfaces.TunIface)
+
+ def _make_tap_iface(self, parameters):
+ return self._make_generic(parameters, self._interfaces.TapIface)
+
def _make_netpipe(self, parameters):
- iface = self._interfaces.NetPipe(self.plapi)
-
- # Note: there is 1-to-1 correspondence between attribute names
- # If that changes, this has to change as well
- for attr,val in parameters.iteritems():
- setattr(iface, attr, val)
-
- return iface
-
+ return self._make_generic(parameters, self._interfaces.NetPipe)
+
def _make_internet(self, parameters):
- return self._interfaces.Internet(self.plapi)
-
+ return self._make_generic(parameters, self._interfaces.Internet)
+
def _make_application(self, parameters):
- app = self._app.Application(self.plapi)
-
- # Note: there is 1-to-1 correspondence between attribute names
- # If that changes, this has to change as well
- for attr,val in parameters.iteritems():
- setattr(app, attr, val)
-
- return app
-
+ return self._make_generic(parameters, self._app.Application)
+
+ def _make_dependency(self, parameters):
+ return self._make_generic(parameters, self._app.Dependency)
+
+ def _make_nepi_dependency(self, parameters):
+ return self._make_generic(parameters, self._app.NepiDependency)
+ def _make_ns3_dependency(self, parameters):
+ return self._make_generic(parameters, self._app.NS3Dependency)