import collections
import cStringIO
import resourcealloc
+import socket
+import sys
from nepi.util import server
+from nepi.util import parallel
+
+class UnresponsiveNodeError(RuntimeError):
+ pass
class Node(object):
BASEFILTERS = {
DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
+ RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
+ RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
def __init__(self, api=None):
if not api:
self.operatingSystem = None
self.pl_distro = None
self.site = None
- self.emulation = None
self.minReliability = None
self.maxReliability = None
self.minBandwidth = None
self.required_packages = set()
self.required_vsys = set()
self.pythonpath = []
+ self.rpmFusion = False
self.env = collections.defaultdict(list)
# Testbed-derived attributes
)
def find_candidates(self, filter_slice_id=None):
+ print >>sys.stderr, "Finding candidates for", self.make_filter_description()
+
fields = ('node_id',)
replacements = {'timeframe':self.timeframe}
# only pick healthy nodes
basefilters['run_level'] = 'boot'
basefilters['boot_state'] = 'boot'
+ basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
+ basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
# keyword-only "pseudofilters"
extra = {}
len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
candidates = set(filter(predicate, candidates))
+
+ # make sure hostnames are resolvable
+ if candidates:
+ print >>sys.stderr, " Found", len(candidates), "candidates. Checking for reachability..."
+
+ hostnames = dict(map(operator.itemgetter('node_id','hostname'),
+ self._api.GetNodes(list(candidates), ['node_id','hostname'])
+ ))
+ def resolvable(node_id):
+ try:
+ addr = socket.gethostbyname(hostnames[node_id])
+ return addr is not None
+ except:
+ return False
+ candidates = set(parallel.pfilter(resolvable, candidates,
+ maxthreads = 16))
+
+ print >>sys.stderr, " Found", len(candidates), "reachable candidates."
return candidates
self._node_id = node_id
self.fetch_node_info()
+ def unassign_node(self):
+ self._node_id = None
+ self.__dict__.update(self.__orig_attrs)
+
def fetch_node_info(self):
+ orig_attrs = {}
+
info = self._api.GetNodes(self._node_id)[0]
tags = dict( (t['tagname'],t['value'])
for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
+ orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
+ orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
self.min_num_external_ifaces = None
self.max_num_external_ifaces = None
self.timeframe = 'm'
for attr, tag in self.BASEFILTERS.iteritems():
if tag in info:
value = info[tag]
+ if hasattr(self, attr):
+ orig_attrs[attr] = getattr(self, attr)
setattr(self, attr, value)
for attr, (tag,_) in self.TAGFILTERS.iteritems():
tag = tag % replacements
if tag in tags:
value = tags[tag]
+ if hasattr(self, attr):
+ orig_attrs[attr] = getattr(self, attr)
setattr(self, attr, value)
if 'peer_id' in info:
+ orig_attrs['site'] = self.site
self.site = self._api.peer_map[info['peer_id']]
if 'interface_ids' in info:
self.max_num_external_ifaces = len(info['interface_ids'])
if 'ssh_rsa_key' in info:
+ orig_attrs['server_key'] = self.server_key
self.server_key = info['ssh_rsa_key']
+
+ self.__orig_attrs = orig_attrs
def validate(self):
if self.home_path is None:
pidfile = self.DEPENDS_PIDFILE
logfile = self.DEPENDS_LOGFILE
+ # If we need rpmfusion, we must install the repo definition and the gpg keys
+ if self.rpmFusion:
+ if self.operatingSystem == 'f12':
+ # Fedora 12 requires a different rpmfusion package
+ RPM_FUSION_URL = self.RPM_FUSION_URL_F12
+ else:
+ # This one works for f13+
+ RPM_FUSION_URL = self.RPM_FUSION_URL
+
+ rpmFusion = (
+ '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
+ ) % {
+ 'RPM_FUSION_URL' : RPM_FUSION_URL
+ }
+ else:
+ rpmFusion = ''
+
# Start process in a "daemonized" way, using nohup and heavy
# stdin/out redirection to avoid connection issues
(out,err),proc = rspawn.remote_spawn(
- "( yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
+ "( %(rpmfusion)s yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
'packages' : ' '.join(self.required_packages),
+ 'rpmfusion' : rpmFusion,
},
pidfile = pidfile,
stdout = logfile,
if proc.wait():
raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+ def wait_provisioning(self, timeout = 20*60):
+ # recently provisioned nodes may not be up yet
+ sleeptime = 1.0
+ totaltime = 0.0
+ while not self.is_alive():
+ time.sleep(sleeptime)
+ totaltime += sleeptime
+ sleeptime = min(30.0, sleeptime*1.5)
+
+ if totaltime > timeout:
+ # PlanetLab has a 15' delay on configuration propagation
+ # If we're above that delay, the unresponsiveness is not due
+ # to this delay.
+ raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
+
def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
if self.required_packages:
pidfile = self.DEPENDS_PIDFILE
def is_alive(self):
# Make sure all the paths are created where
# they have to be created for deployment
- (out,err),proc = server.popen_ssh_command(
+ (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
"echo 'ALIVE'",
host = self.hostname,
port = None,
for dev in devs:
if dev.routes_here(route):
# Schedule rule
- dest, prefix, nexthop = route
+ dest, prefix, nexthop, metric = route
rules.append(
"add %s%s gw %s %s" % (
dest,
raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
"- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
+ print >>sys.stderr, "Setting up routes for", self.hostname
+
(out,err),proc = server.popen_ssh_command(
"( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
home = server.shell_escape(self.home_path)),