-#!/usr/bin/env python
# -*- coding: utf-8 -*-
from constants import TESTBED_ID
import logging
import ipaddr
import operator
+import re
from nepi.util import server
from nepi.util import parallel
minLoad = _castproperty(float, '_minLoad')
maxLoad = _castproperty(float, '_maxLoad')
- def __init__(self, api=None):
+ def __init__(self, api=None, sliceapi=None):
if not api:
api = plcapi.PLCAPI()
self._api = api
+ self._sliceapi = sliceapi or api
# Attributes
self.hostname = None
self.maxLoad = None
self.min_num_external_ifaces = None
self.max_num_external_ifaces = None
- self.timeframe = 'm'
+ self._timeframe = 'w'
# Applications and routes add requirements to connected nodes
self.required_packages = set()
self.ident_path = None
self.server_key = None
self.home_path = None
- self.enable_cleanup = False
+ self.enable_proc_cleanup = False
+ self.enable_home_cleanup = False
# Those are filled when an actual node is allocated
self._node_id = None
# Logging
self._logger = logging.getLogger('nepi.testbeds.planetlab')
+
+ def set_timeframe(self, timeframe):
+ if timeframe == "latest":
+ self._timeframe = ""
+ elif timeframe == "month":
+ self._timeframe = "m"
+ elif timeframe == "year":
+ self._timeframe = "y"
+ else:
+ self._timeframe = "w"
+
+ def get_timeframe(self):
+ if self._timeframe == "":
+ return "latest"
+ if self._timeframe == "m":
+ return "month"
+ if self._timeframe == "y":
+ return "year"
+ return "week"
+
+ timeframe = property(get_timeframe, set_timeframe)
def _nepi_testbed_environment_setup_get(self):
command = cStringIO.StringIO()
for envval in envvals:
command.write(' ; export %s=%s' % (envkey, envval))
return command.getvalue()
+
def _nepi_testbed_environment_setup_set(self, value):
pass
+
_nepi_testbed_environment_setup = property(
_nepi_testbed_environment_setup_get,
_nepi_testbed_environment_setup_set)
self._logger.info("Finding candidates for %s", self.make_filter_description())
fields = ('node_id',)
- replacements = {'timeframe':self.timeframe}
+ replacements = {'timeframe':self._timeframe}
# get initial candidates (no tag filters)
basefilters = self.build_filters({}, self.BASEFILTERS)
extra['peer'] = self.site
candidates = set(map(operator.itemgetter('node_id'),
- self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
-
+ self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
+
# filter by tag, one tag at a time
applicable = self.applicable_filters
for tagfilter in self.TAGFILTERS.iteritems():
if attr in applicable:
tagfilter = rootfilters.copy()
tagfilter['tagname'] = tagname % replacements
- tagfilter[expr % replacements] = getattr(self,attr)
+ tagfilter[expr % replacements] = str(getattr(self,attr))
tagfilter['node_id'] = list(candidates)
-
+
candidates &= set(map(operator.itemgetter('node_id'),
- self._api.GetNodeTags(filters=tagfilter, fields=fields)))
-
+ self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
+
# filter by vsys tags - special case since it doesn't follow
# the usual semantics
if self.required_vsys:
newcandidates = collections.defaultdict(set)
- vsys_tags = self._api.GetNodeTags(
+ vsys_tags = self._sliceapi.GetNodeTags(
tagname='vsys',
node_id = list(candidates),
fields = ['node_id','value'])
-
+
vsys_tags = map(
operator.itemgetter(['node_id','value']),
vsys_tags)
filters = basefilters.copy()
filters['node_id'] = list(candidates)
ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
- self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
+ self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
# filter candidates by interface count
if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
candidates = set(filter(predicate, candidates))
-
+
# make sure hostnames are resolvable
+ hostnames = dict()
if candidates:
self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
-
+
hostnames = dict(map(operator.itemgetter('node_id','hostname'),
- self._api.GetNodes(list(candidates), ['node_id','hostname'])
+ self._sliceapi.GetNodes(list(candidates), ['node_id','hostname'])
))
+
def resolvable(node_id):
try:
addr = socket.gethostbyname(hostnames[node_id])
maxthreads = 16))
self._logger.info(" Found %s reachable candidates.", len(candidates))
-
- return candidates
+
+ for h in hostnames.keys():
+ if h not in candidates:
+ del hostnames[h]
+
+ hostnames = dict((v,k) for k, v in hostnames.iteritems())
+
+ return hostnames
def make_filter_description(self):
"""
def unassign_node(self):
self._node_id = None
- self.__dict__.update(self.__orig_attrs)
+ self.hostip = None
+
+ try:
+ orig_attrs = self.__orig_attrs
+ except AttributeError:
+ return
+
+ for key, value in orig_attrs.iteritems():
+ setattr(self, key, value)
+ del self.__orig_attrs
def rate_nodes(self, nodes):
rates = collections.defaultdict(int)
tags = collections.defaultdict(dict)
- replacements = {'timeframe':self.timeframe}
+ replacements = {'timeframe':self._timeframe}
tagnames = [ tagname % replacements
for tagname, weight, default in self.RATE_FACTORS ]
-
- taginfo = self._api.GetNodeTags(
+
+ taginfo = self._sliceapi.GetNodeTags(
node_id=list(nodes),
tagname=tagnames,
fields=('node_id','tagname','value'))
-
+
unpack = operator.itemgetter('node_id','tagname','value')
for value in taginfo:
node, tagname, value = unpack(value)
if value and value.lower() != 'n/a':
- tags[tagname][int(node)] = float(value)
+ tags[tagname][node] = float(value)
for tagname, weight, default in self.RATE_FACTORS:
taginfo = tags[tagname % replacements].get
def fetch_node_info(self):
orig_attrs = {}
- self._api.StartMulticall()
- info = self._api.GetNodes(self._node_id)
- tags = self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value'))
- info, tags = self._api.FinishMulticall()
+ info, tags = self._sliceapi.GetNodeInfo(self._node_id)
info = info[0]
tags = dict( (t['tagname'],t['value'])
orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
self.min_num_external_ifaces = None
self.max_num_external_ifaces = None
- self.timeframe = 'm'
+ if not self._timeframe: self._timeframe = 'w'
- replacements = {'timeframe':self.timeframe}
+ replacements = {'timeframe':self._timeframe}
+
for attr, tag in self.BASEFILTERS.iteritems():
if tag in info:
value = info[tag]
if 'peer_id' in info:
orig_attrs['site'] = self.site
- self.site = self._api.peer_map[info['peer_id']]
+ self.site = self._sliceapi.peer_map[info['peer_id']]
if 'interface_ids' in info:
self.min_num_external_ifaces = \
orig_attrs['server_key'] = self.server_key
self.server_key = info['ssh_rsa_key']
- self.__orig_attrs = orig_attrs
+ self.hostip = socket.gethostbyname(self.hostname)
+
+ try:
+ self.__orig_attrs
+ except AttributeError:
+ self.__orig_attrs = orig_attrs
def validate(self):
if self.home_path is None:
RPM_FUSION_URL = self.RPM_FUSION_URL
rpmFusion = (
- 'sudo -S rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s'
+ 'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
) % {
'RPM_FUSION_URL' : RPM_FUSION_URL
}
)
if proc.wait():
+ if self.check_bad_host(out,err):
+ self.blacklist()
raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
# Launch p2p yum dependency installer
raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
# Ensure the node is clean (no apps running that could interfere with operations)
- if self.enable_cleanup:
- self.do_cleanup()
-
+ if self.enable_proc_cleanup:
+ self.do_proc_cleanup()
+ if self.enable_home_cleanup:
+ self.do_home_cleanup()
+
def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
# Wait for the p2p installer
if self._yum_dependencies and not self._installed:
return False
def destroy(self):
- if self.enable_cleanup:
- self.do_cleanup()
+ if self.enable_proc_cleanup:
+ self.do_proc_cleanup()
def blacklist(self):
if self._node_id:
self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
import util
- util.appendBlacklist(self._node_id)
+ util.appendBlacklist(self.hostname)
- def do_cleanup(self):
+ def do_proc_cleanup(self):
if self.testbed().recovering:
# WOW - not now
return
- self._logger.info("Cleaning up %s", self.hostname)
+ self._logger.info("Cleaning up processes on %s", self.hostname)
cmds = [
"sudo -S killall python tcpdump || /bin/true ; "
retry = 3
)
proc.wait()
-
+
+ def do_home_cleanup(self):
+ if self.testbed().recovering:
+ # WOW - not now
+ return
+
+ self._logger.info("Cleaning up home on %s", self.hostname)
+
+ cmds = [
+ "find . -maxdepth 1 -name 'nepi-*' -execdir rm -rf {} + "
+ ]
+
+ for cmd in cmds:
+ (out,err),proc = server.popen_ssh_command(
+ # Some apps need two kills
+ cmd % {
+ 'slicename' : self.slicename ,
+ },
+ host = self.hostname,
+ port = None,
+ user = self.slicename,
+ agent = None,
+ ident_key = self.ident_path,
+ server_key = self.server_key,
+ tty = True, # so that ps -N -T works as advertised...
+ timeout = 60,
+ retry = 3
+ )
+ proc.wait()
+
def prepare_dependencies(self):
# Configure p2p yum dependency installer
if self.required_packages and not self._installed:
if len(routes) > MAX_VROUTE_ROUTES:
return 'sliceip'
- vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
+ vsys_vnet = ipaddr.IPv4Network(vsys_vnet)
for route in routes:
- dest, prefix, nexthop, metric = route
- dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
+ dest, prefix, nexthop, metric, device = route
+ dest = ipaddr.IPv4Network("%s/%d" % (dest,prefix))
nexthop = ipaddr.IPAddress(nexthop)
if dest not in vsys_vnet or nexthop not in vsys_vnet:
return 'sliceip'
return 'vroute'
def format_route(self, route, dev, method, action):
- dest, prefix, nexthop, metric = route
+ dest, prefix, nexthop, metric, device = route
if method == 'vroute':
return (
"%s %s%s gw %s %s" % (
elif out or err:
logger.debug("%s said: %s%s", method, out, err)
+ def check_bad_host(self, out, err):
+ badre = re.compile(r'(?:'
+ r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
+ r'|Error: disk I/O error'
+ r')',
+ re.I)
+ return badre.search(out) or badre.search(err)