-#!/usr/bin/env python
# -*- coding: utf-8 -*-
-from constants import TESTBED_ID
+from constants import TESTBED_ID, TESTBED_VERSION
from nepi.core import testbed_impl
+from nepi.core.metadata import Parallel
from nepi.util.constants import TIME_NOW
from nepi.util.graphtools import mst
from nepi.util import ipaddr2
+from nepi.util import environ
+from nepi.util.parallel import ParallelRun
+import threading
import sys
import os
import os.path
import subprocess
import random
import shutil
-
-from nepi.util.constants import TESTBED_STATUS_CONFIGURED
+import logging
+import metadata
+import weakref
+import util as plutil
class TempKeyError(Exception):
pass
class TestbedController(testbed_impl.TestbedController):
- def __init__(self, testbed_version):
- super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
+ def __init__(self):
+ super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
self._home_directory = None
self.slicename = None
self._traces = dict()
- import node, interfaces, application
+ import node, interfaces, application, multicast
self._node = node
self._interfaces = interfaces
self._app = application
+ self._multicast = multicast
self._blacklist = set()
+ self._just_provisioned = set()
+
+ self._load_blacklist()
+
+ self._slice_id = None
+ self._plcapi = None
+ self._sliceapi = None
+ self._vsys_vnet = None
+
+ self._logger = logging.getLogger('nepi.testbeds.planetlab')
+
+ self.recovering = False
@property
def home_directory(self):
return self._home_directory
@property
- def plapi(self):
- if not hasattr(self, '_plapi'):
+ def plcapi(self):
+ if not self._plcapi:
import plcapi
-
- if self.authUser:
- self._plapi = plcapi.PLCAPI(
- username = self.authUser,
- password = self.authString,
- hostname = self.plcHost,
- urlpattern = self.plcUrl
+ self._plcapi = plcapi.plcapi(
+ self.authUser,
+ self.authString,
+ self.plcHost,
+ self.plcUrl
)
+ return self._plcapi
+
+ @property
+ def sliceapi(self):
+ if not self._sliceapi:
+ if not self.sfa:
+ self._sliceapi = self.plcapi
else:
- # anonymous access - may not be enough for much
- self._plapi = plcapi.PLCAPI()
- return self._plapi
+ from nepi.util import sfiapi
+ self._sliceapi = sfiapi.sfiapi(self.slice_id)
+ return self._sliceapi
@property
def slice_id(self):
- if not hasattr(self, '_slice_id'):
- slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
- if slices:
- self._slice_id = slices[0]['slice_id']
- else:
- # If it wasn't found, don't remember this failure, keep trying
- return None
+ if not self._slice_id:
+ self._slice_id = self.sliceapi.GetSliceId(self.slicename)
return self._slice_id
+
+ @property
+ def vsys_vnet(self):
+ if not self._vsys_vnet:
+ self._vsys_vnet = self.sliceapi.GetSliceVnetSysTag(self.slicename)
+ return self._vsys_vnet
+ def _load_blacklist(self):
+ blpath = environ.homepath('plblacklist')
+
+ try:
+ bl = open(blpath, "r")
+ except:
+ self._blacklist = set()
+ return
+
+ try:
+ self._blacklist = set(
+ map(str.strip, bl.readlines())
+ )
+ finally:
+ bl.close()
+
+ def _save_blacklist(self):
+ blpath = environ.homepath('plblacklist')
+ bl = open(blpath, "w")
+ try:
+ bl.writelines(
+ map('%s\n'.__mod__, self._blacklist))
+ finally:
+ bl.close()
+
def do_setup(self):
self._home_directory = self._attributes.\
get_attribute_value("homeDirectory")
get_attribute_value("plcHost")
self.plcUrl = self._attributes.\
get_attribute_value("plcUrl")
+ self.logLevel = self._attributes.\
+ get_attribute_value("plLogLevel")
+ self.tapPortBase = self._attributes.\
+ get_attribute_value("tapPortBase")
+ self.p2pDeployment = self._attributes.\
+ get_attribute_value("p2pDeployment")
+ self.cleanProc = self._attributes.\
+ get_attribute_value("cleanProc")
+ self.cleanHome = self._attributes.\
+ get_attribute_value("cleanHome")
+ self.sfa = self._attributes.\
+ get_attribute_value("sfa")
+ if self.sfa:
+ self._slice_id = self._attributes.\
+ get_attribute_value("sliceHrn")
+
+ if not self.slicename:
+ raise RuntimeError, "Slice not set"
+ if not self.authUser:
+ raise RuntimeError, "PlanetLab account username not set"
+ if not self.authString:
+ raise RuntimeError, "PlanetLab account passphrase not set"
+ if not self.sliceSSHKey:
+ raise RuntimeError, "PlanetLab account key not specified"
+ if not os.path.exists(self.sliceSSHKey):
+ raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
+
+ self._logger.setLevel(getattr(logging,self.logLevel))
+
super(TestbedController, self).do_setup()
def do_post_asynclaunch(self, guid):
# Oh... retry...
pass
- # Plan application deployment
- self.do_spanning_deployment_plan()
+ if self.p2pDeployment:
+ # Plan application deployment
+ self.do_spanning_deployment_plan()
# Configure elements per XML data
super(TestbedController, self).do_preconfigure()
- def do_resource_discovery(self):
+ def do_resource_discovery(self, recover = False):
to_provision = self._to_provision = set()
reserved = set(self._blacklist)
for guid, node in self._elements.iteritems():
if isinstance(node, self._node.Node) and node._node_id is not None:
- reserved.add(node._node_id)
+ reserved.add(node.hostname)
# Initial algo:
# look for perfectly defined nodes
# (ie: those with only one candidate)
- for guid, node in self._elements.iteritems():
- if isinstance(node, self._node.Node) and node._node_id is None:
- # Try existing nodes first
- # If we have only one candidate, simply use it
- candidates = node.find_candidates(
- filter_slice_id = self.slice_id)
- candidates -= reserved
- if len(candidates) == 1:
- node_id = iter(candidates).next()
- node.assign_node_id(node_id)
- reserved.add(node_id)
- elif not candidates:
+ reserve_lock = threading.RLock()
+ def assignifunique(guid, node):
+ # Try existing nodes first
+ # If we have only one candidate, simply use it
+ candidates = node.find_candidates(
+ filter_slice_id = self.slice_id)
+
+ node_id = None
+ candidate_hosts = set(candidates.keys() if candidates else [])
+ reserve_lock.acquire()
+ try:
+ candidate_hosts -= reserved
+ if len(candidate_hosts) == 1:
+ hostname = iter(candidate_hosts).next()
+ node_id = candidates[hostname]
+ reserved.add(hostname)
+ elif not candidate_hosts:
# Try again including unassigned nodes
- candidates = node.find_candidates()
- if len(candidates) > 1:
- continue
- if len(candidates) == 1:
- node_id = iter(candidates).next()
- node.assign_node_id(node_id)
+ reserve_lock.release()
+ try:
+ candidates = node.find_candidates()
+ finally:
+ reserve_lock.acquire()
+ candidate_hosts = set(candidates.keys() if candidates else [])
+ candidate_hosts -= reserved
+ if len(candidate_hosts) > 1:
+ return
+ if len(candidate_hosts) == 1:
+ hostname = iter(candidate_hosts).next()
+ node_id = candidates[hostname]
to_provision.add(node_id)
- reserved.add(node_id)
+ reserved.add(hostname)
elif not candidates:
- raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
+ raise RuntimeError, "Cannot assign resources for node %s, no candidates with %s" % (guid,
node.make_filter_description())
+ finally:
+ reserve_lock.release()
+
+ if node_id is not None:
+ node.assign_node_id(node_id)
+
+ runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
+ runner.start()
+ for guid, node in self._elements.iteritems():
+ if isinstance(node, self._node.Node) and node._node_id is None:
+ runner.put(assignifunique, guid, node)
+ runner.sync()
# Now do the backtracking search for a suitable solution
# First with existing slice nodes
reqs = []
nodes = []
+ def genreqs(node, filter_slice_id=None):
+ # Try existing nodes first
+ # If we have only one candidate, simply use it
+ candidates = node.find_candidates(
+ filter_slice_id = filter_slice_id)
+ for r in reserved:
+ if candidates.has_key(r):
+ del candidates[r]
+ reqs.append(candidates.values())
+ nodes.append(node)
for guid, node in self._elements.iteritems():
if isinstance(node, self._node.Node) and node._node_id is None:
- # Try existing nodes first
- # If we have only one candidate, simply use it
- candidates = node.find_candidates(
- filter_slice_id = self.slice_id)
- candidates -= reserved
- reqs.append(candidates)
- nodes.append(node)
-
+ runner.put(genreqs, node, self.slice_id)
+ runner.sync()
+
if nodes and reqs:
+ if recover:
+ raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
+
+ def pickbest(fullset, nreq, node=nodes[0]):
+ if len(fullset) > nreq:
+ fullset = zip(node.rate_nodes(fullset),fullset)
+ fullset.sort(reverse=True)
+ del fullset[nreq:]
+ return set(map(operator.itemgetter(1),fullset))
+ else:
+ return fullset
+
try:
- solution = resourcealloc.alloc(reqs)
+ solution = resourcealloc.alloc(reqs, sample=pickbest)
except resourcealloc.ResourceAllocationError:
# Failed, try again with all nodes
reqs = []
for node in nodes:
- candidates = node.find_candidates()
- reqs.append(candidates)
-
- solution = resourcealloc.alloc(reqs)
+ runner.put(genreqs, node)
+ runner.sync()
+ solution = resourcealloc.alloc(reqs, sample=pickbest)
to_provision.update(solution)
# Do assign nodes
for node, node_id in zip(nodes, solution):
- node.assign_node_id(node_id)
+ runner.put(node.assign_node_id, node_id)
+ runner.join()
def do_provisioning(self):
if self._to_provision:
# Add new nodes to the slice
- cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
+ cur_nodes = self.sliceapi.GetSliceNodes(self.slice_id)
new_nodes = list(set(cur_nodes) | self._to_provision)
- self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
+ self.sliceapi.AddSliceNodes(self.slice_id, nodes=new_nodes)
# cleanup
+ self._just_provisioned = self._to_provision
del self._to_provision
def do_wait_nodes(self):
node.slicename = self.slicename
# Show the magic
- print "PlanetLab Node", guid, "configured at", node.hostname
-
+ self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
+
try:
+ runner = ParallelRun(maxthreads=64, maxqueue=1)
+ abort = []
+ def waitforit(guid, node):
+ try:
+ node.wait_provisioning(
+ (20*60 if node._node_id in self._just_provisioned else 60)
+ )
+
+ self._logger.info("READY Node %s at %s", guid, node.hostname)
+
+ # Prepare dependency installer now
+ node.prepare_dependencies()
+ except:
+ abort.append(None)
+ raise
+
for guid, node in self._elements.iteritems():
+ if abort:
+ break
if isinstance(node, self._node.Node):
- print "Waiting for Node", guid, "configured at", node.hostname,
- sys.stdout.flush()
+ self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
+ runner.put(waitforit, guid, node)
+ runner.join()
- node.wait_provisioning()
-
- print "READY"
except self._node.UnresponsiveNodeError:
# Uh...
- print "UNRESPONSIVE"
+ self._logger.warn("UNRESPONSIVE Nodes")
# Mark all dead nodes (which are unresponsive) on the blacklist
# and re-raise
for guid, node in self._elements.iteritems():
if isinstance(node, self._node.Node):
if not node.is_alive():
- print "Blacklisting", node.hostname, "for unresponsiveness"
- self._blacklist.add(node._node_id)
+ self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
+ self._blacklist.add(node.hostname)
node.unassign_node()
+
+ try:
+ self._save_blacklist()
+ except:
+ # not important...
+ import traceback
+ traceback.print_exc()
+
raise
def do_spanning_deployment_plan(self):
app.node.architecture,
app.node.operatingSystem,
app.node.pl_distro,
+ app.__class__,
)
depgroups = collections.defaultdict(list)
for element in self._elements.itervalues():
if isinstance(element, self._app.Dependency):
depgroups[dephash(element)].append(element)
+ elif isinstance(element, self._node.Node):
+ deps = element._yum_dependencies
+ if deps:
+ depgroups[dephash(deps)].append(deps)
# Set up spanning deployment for those applications that
# have been deployed in several nodes.
suffix = ".pub")
# Create secure 256-bits temporary passphrase
- passphrase = ''.join(map(chr,[rng.randint(0,255)
- for rng in (random.SystemRandom(),)
- for i in xrange(32)] )).encode("hex")
+ passphrase = os.urandom(32).encode("hex")
# Copy keys
oprk = open(self.sliceSSHKey, "rb")
# TODO: take on account schedule time for the task
element = self._elements[guid]
if element:
- setattr(element, name, value)
+ if name == "up":
+ if value == True:
+ element.if_up()
+ else:
+ element.if_down()
+
+ try:
+ setattr(element, name, value)
+ except:
+ # We ignore these errors while recovering.
+ # Some attributes are immutable, and setting
+ # them is necessary (to recover the state), but
+ # some are not (they throw an exception).
+ if not self.recovering:
+ raise
if hasattr(element, 'refresh'):
# invoke attribute refresh hook
# TODO: take on account schedule time for the task
factory_id = self._create[guid]
factory = self._factories[factory_id]
- if factory.box_attributes.is_attribute_design_only(name):
- return value
element = self._elements.get(guid)
try:
return getattr(element, name)
- except KeyError, AttributeError:
+ except (KeyError, AttributeError):
return value
def get_address(self, guid, index, attribute='Address'):
def shutdown(self):
for trace in self._traces.itervalues():
trace.close()
- for element in self._elements.itervalues():
- # invoke cleanup hooks
- if hasattr(element, 'cleanup'):
- element.cleanup()
- for element in self._elements.itervalues():
- # invoke destroy hooks
- if hasattr(element, 'destroy'):
- element.destroy()
+
+ def invokeif(action, testbed, guid):
+ element = self._elements[guid]
+ if hasattr(element, action):
+ getattr(element, action)()
+
+ self._do_in_factory_order(
+ functools.partial(invokeif, 'cleanup'),
+ metadata.shutdown_order)
+
+ self._do_in_factory_order(
+ functools.partial(invokeif, 'destroy'),
+ metadata.shutdown_order)
+
self._elements.clear()
self._traces.clear()
def trace(self, guid, trace_id, attribute='value'):
- app = self._elements[guid]
+ elem = self._elements[guid]
if attribute == 'value':
- path = app.sync_trace(self.home_directory, trace_id)
+ path = elem.sync_trace(self.home_directory, trace_id)
if path:
fd = open(path, "r")
content = fd.read()
else:
content = None
elif attribute == 'path':
- content = app.remote_trace_path(trace_id)
- elif attribute == 'size':
- # TODO
- raise NotImplementedError
+ content = elem.remote_trace_path(trace_id)
+ elif attribute == 'name':
+ content = elem.remote_trace_name(trace_id)
else:
content = None
return content
def follow_trace(self, trace_id, trace):
self._traces[trace_id] = trace
+
+ def recover(self):
+ try:
+ # An internal flag, so we know to behave differently in
+ # a few corner cases.
+ self.recovering = True
+
+ # Create and connect do not perform any real tasks against
+ # the nodes, it only sets up the object hierarchy,
+ # so we can run them normally
+ self.do_create()
+ self.do_connect_init()
+ self.do_connect_compl()
+
+ # Manually recover nodes, to mark dependencies installed
+ # and clean up mutable attributes
+ self._do_in_factory_order(
+ lambda self, guid : self._elements[guid].recover(),
+ [
+ metadata.NODE,
+ ])
+
+ # Assign nodes - since we're working off exeucte XML, nodes
+ # have specific hostnames assigned and we don't need to do
+ # real assignment, only find out node ids and check liveliness
+ self.do_resource_discovery(recover = True)
+ self.do_wait_nodes()
+
+ # Pre/post configure, however, tends to set up tunnels
+ # Execute configuration steps only for those object
+ # kinds that do not have side effects
+
+ # Do the ones without side effects,
+ # including nodes that need to set up home
+ # folders and all that
+ self._do_in_factory_order(
+ "preconfigure_function",
+ [
+ metadata.INTERNET,
+ Parallel(metadata.NODE),
+ metadata.NODEIFACE,
+ ])
+
+ # Tunnels require a home path that is configured
+ # at this step. Since we cannot run the step itself,
+ # we need to inject this homepath ourselves
+ for guid, element in self._elements.iteritems():
+ if isinstance(element, self._interfaces.TunIface):
+ element._home_path = "tun-%s" % (guid,)
+
+ # Manually recover tunnels, applications and
+ # netpipes, negating the side effects
+ self._do_in_factory_order(
+ lambda self, guid : self._elements[guid].recover(),
+ [
+ Parallel(metadata.TAPIFACE),
+ Parallel(metadata.TUNIFACE),
+ metadata.NETPIPE,
+ Parallel(metadata.NEPIDEPENDENCY),
+ Parallel(metadata.NS3DEPENDENCY),
+ Parallel(metadata.DEPENDENCY),
+ Parallel(metadata.APPLICATION),
+ Parallel(metadata.CCNXDAEMON),
+ ])
+
+ # Tunnels are not harmed by configuration after
+ # recovery, and some attributes get set this way
+ # like external_iface
+ self._do_in_factory_order(
+ "preconfigure_function",
+ [
+ Parallel(metadata.TAPIFACE),
+ Parallel(metadata.TUNIFACE),
+ ])
+
+ # Post-do the ones without side effects
+ self._do_in_factory_order(
+ "configure_function",
+ [
+ metadata.INTERNET,
+ Parallel(metadata.NODE),
+ metadata.NODEIFACE,
+ Parallel(metadata.TAPIFACE),
+ Parallel(metadata.TUNIFACE),
+ ])
+
+ # There are no required prestart steps
+ # to call upon recovery, so we're done
+ finally:
+ self.recovering = True
- def _make_generic(self, parameters, kind):
- app = kind(self.plapi)
+ def _make_generic(self, parameters, kind, **kwargs):
+ args = dict({'api': self.plcapi})
+ args.update(kwargs)
+ app = kind(**args)
+ app.testbed = weakref.ref(self)
# Note: there is 1-to-1 correspondence between attribute names
# If that changes, this has to change as well
for attr,val in parameters.iteritems():
- setattr(app, attr, val)
+ try:
+ setattr(app, attr, val)
+ except:
+ # We ignore these errors while recovering.
+ # Some attributes are immutable, and setting
+ # them is necessary (to recover the state), but
+ # some are not (they throw an exception).
+ if not self.recovering:
+ raise
return app
def _make_node(self, parameters):
- node = self._make_generic(parameters, self._node.Node)
-
- # If emulation is enabled, we automatically need
- # some vsys interfaces and packages
- if node.emulation:
- node.required_vsys.add('ipfw-be')
- node.required_packages.add('ipfwslice')
-
+ args = dict({'sliceapi': self.sliceapi})
+ node = self._make_generic(parameters, self._node.Node, **args)
+ node.enable_proc_cleanup = self.cleanProc
+ node.enable_home_cleanup = self.cleanHome
return node
def _make_node_iface(self, parameters):
def _make_internet(self, parameters):
return self._make_generic(parameters, self._interfaces.Internet)
- def _make_application(self, parameters):
- return self._make_generic(parameters, self._app.Application)
+ def _make_application(self, parameters, clazz = None):
+ if not clazz:
+ clazz = self._app.Application
+ return self._make_generic(parameters, clazz)
def _make_dependency(self, parameters):
return self._make_generic(parameters, self._app.Dependency)
def _make_ns3_dependency(self, parameters):
return self._make_generic(parameters, self._app.NS3Dependency)
+ def _make_tun_filter(self, parameters):
+ return self._make_generic(parameters, self._interfaces.TunFilter)
+
+ def _make_class_queue_filter(self, parameters):
+ return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
+
+ def _make_tos_queue_filter(self, parameters):
+ return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
+
+ def _make_multicast_forwarder(self, parameters):
+ return self._make_generic(parameters, self._multicast.MulticastForwarder)
+
+ def _make_multicast_announcer(self, parameters):
+ return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
+
+ def _make_multicast_router(self, parameters):
+ return self._make_generic(parameters, self._multicast.MulticastRouter)
+
+