#
# NEPI, a framework to manage network experiments
# Copyright (C) 2013 INRIA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Author: Lucia Guevgeozian
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import ResourceManager, clsinit_copy, \
ResourceState, reschedule_delay
from nepi.resources.linux.node import LinuxNode
from nepi.util.sfaapi import SFAAPIFactory
from nepi.util.execfuncs import lexec
from nepi.util import sshfuncs
from random import randint
import time
import re
import weakref
import socket
import threading
import datetime
@clsinit_copy
class WilabtSfaNode(LinuxNode):
_rtype = "WilabtSfaNode"
_help = "Controls a Wilabt host accessible using a SSH key " \
"and provisioned using SFA"
_backend = "omf"
@classmethod
def _register_attributes(cls):
sfa_user = Attribute("sfauser", "SFA user",
flags = Flags.Credential)
sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
used to generate the user credential",
flags = Flags.Credential)
slicename = Attribute("slicename", "SFA slice for the experiment",
flags = Flags.Credential)
gateway_user = Attribute("gatewayUser", "Gateway account username",
flags = Flags.Design)
gateway = Attribute("gateway", "Hostname of the gateway machine",
flags = Flags.Design)
cls._register_attribute(sfa_user)
cls._register_attribute(sfa_private_key)
cls._register_attribute(slicename)
cls._register_attribute(gateway_user)
cls._register_attribute(gateway)
def __init__(self, ec, guid):
super(WilabtSfaNode, self).__init__(ec, guid)
self._ecobj = weakref.ref(ec)
self._sfaapi = None
self._node_to_provision = None
self._slicenode = False
self._hostname = False
self._username = None
def _skip_provision(self):
sfa_user = self.get("sfauser")
if not sfa_user:
return True
else: return False
@property
def sfaapi(self):
if not self._sfaapi:
sfa_user = self.get("sfauser")
sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0"
sfa_auth = '.'.join(sfa_user.split('.')[:2])
sfa_registry = "http://sfa3.planet-lab.eu:12345/"
sfa_private_key = self.get("sfaPrivateKey")
batch = True
_sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth,
sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype)
if not _sfaapi:
self.fail_sfaapi()
self._sfaapi = weakref.ref(_sfaapi)
return self._sfaapi()
def do_discover(self):
"""
Based on the attributes defined by the user, discover the suitable
nodes for provision.
"""
if self._skip_provision():
super(WilabtSfaNode, self).do_discover()
return
nodes = self.sfaapi.get_resources_hrn()
hostname = self._get_hostname()
if hostname:
# the user specified one particular node to be provisioned
self._hostname = True
host_hrn = nodes[hostname]
# check that the node is not blacklisted or being provisioned
# by other RM
if not self._blacklisted(host_hrn):
if not self._reserved(host_hrn):
if self._check_if_in_slice([host_hrn]):
self.debug("Node already in slice %s" % host_hrn)
self._slicenode = True
hostname = hostname + '.wilab2.ilabt.iminds.be'
self.set('hostname', hostname)
self._node_to_provision = host_hrn
super(WilabtSfaNode, self).do_discover()
def do_provision(self):
"""
Add node to user's slice after verifing that the node is functioning
correctly.
"""
if self._skip_provision():
super(WilabtSfaNode, self).do_provision()
return
provision_ok = False
ssh_ok = False
proc_ok = False
timeout = 300
while not provision_ok:
node = self._node_to_provision
if self._slicenode:
self._delete_from_slice()
self.debug("Waiting 300 seg for re-adding to slice")
time.sleep(300) # Timout for the testbed to allow a new reservation
self._add_node_to_slice(node)
t = 0
while not self._check_if_in_slice([node]) and t < timeout:
t = t + 5
time.sleep(t)
self.debug("Waiting 5 seg for resources to be added")
continue
self._get_username()
ssh_ok = self._check_ssh_loop()
if not ssh_ok:
# the timeout was reach without establishing ssh connection
# the node is blacklisted, and a new
# node to provision is discovered
self._blacklist_node(node)
self.do_discover()
continue
# check /proc directory is mounted (ssh_ok = True)
# file system is not read only, hostname is correct
# and omf_rc process is up
else:
if not self._check_fs():
self.do_discover()
continue
if not self._check_omf():
self.do_discover()
continue
if not self._check_hostname():
self.do_discover()
continue
else:
provision_ok = True
if not self.get('hostname'):
self._set_hostname_attr(node)
self.info(" Node provisioned ")
super(WilabtSfaNode, self).do_provision()
def _blacklisted(self, host_hrn):
if self.sfaapi.blacklisted(host_hrn):
self.fail_node_not_available(host_hrn)
return False
def _reserved(self, host_hrn):
if self.sfaapi.reserved(host_hrn):
self.fail_node_not_available(host_hrn)
return False
def _get_username(self):
slicename = self.get("slicename")
if self._username is None:
slice_info = self.sfaapi.get_slice_resources(slicename)
username = slice_info['resource'][0]['services'][0]['login'][0]['username']
self.set('username', username)
self.debug("Retriving username information from RSpec %s" % username)
self._username = username
def _check_ssh_loop(self):
t = 0
timeout = 10
ssh_ok = False
while t < timeout and not ssh_ok:
cmd = 'echo \'GOOD NODE\''
((out, err), proc) = self.execute(cmd)
if out.find("GOOD NODE") < 0:
self.debug( "No SSH connection, waiting 60s" )
t = t + 5
time.sleep(5)
continue
else:
self.debug( "SSH OK" )
ssh_ok = True
continue
return ssh_ok
def _check_fs(self):
cmd = 'mount |grep proc'
((out, err), proc) = self.execute(cmd)
if out.find("/proc type proc") < 0:
self.warning(" Corrupted file system ")
self._blacklist_node(node)
return False
return True
def _check_omfrc(self):
cmd = 'ps aux|grep omf'
((out, err), proc) = self.execute(cmd)
if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0:
return False
return True
def _check_hostname(self):
cmd = 'hostname'
((out, err), proc) = self.execute(cmd)
if 'localhost' in out.lower():
return False
return True
def _add_node_to_slice(self, host_hrn):
self.info(" Adding node to slice ")
slicename = self.get("slicename")
self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn)
def _delete_from_slice(self):
self.warning(" Deleting all slivers from slice ")
slicename = self.get("slicename")
self.sfaapi.remove_all_from_slice(slicename)
def _get_hostname(self):
hostname = self.get("hostname")
if hostname:
return hostname
else:
return None
def _set_hostname_attr(self, node):
"""
Query SFAAPI for the hostname of a certain host hrn and sets the
attribute hostname, it will over write the previous value
"""
hosts_hrn = self.sfaapi.get_resources_hrn()
for hostname, hrn in hosts_hrn.iteritems():
if hrn == node:
hostname = hostname + '.wilab2.ilabt.iminds.be'
self.set("hostname", hostname)
def _check_if_in_slice(self, hosts_hrn):
"""
Check using SFA API if any host hrn from hosts_hrn is in the user's
slice
"""
slicename = self.get("slicename")
slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
if slice_nodes:
if len(slice_nodes[0]['services']) != 0:
slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
else: slice_nodes_hrn = []
nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
return nodes_inslice
def _do_ping(self, hostname):
"""
Perform ping command on node's IP matching hostname
"""
ping_ok = False
guser = self.get("gatewayUser")
gw = self.get("gateway")
host = hostname + ".wilab2.ilabt.iminds.be"
command = "ssh %s@%s 'ping -c4 %s'" % (guser, gw, host)
(out, err) = lexec(command)
m = re.search("(\d+)% packet loss", str(out))
if m and int(m.groups()[0]) < 50:
ping_ok = True
return ping_ok
def _blacklist_node(self, host_hrn):
"""
Add node mal functioning node to blacklist
"""
self.warning(" Blacklisting malfunctioning node ")
self.sfaapi.blacklist_resource(host_hrn)
if not self._hostname:
self.set('hostname', None)
else:
self.set('hostname', host_hrn.split('.').pop())
def _put_node_in_provision(self, host_hrn):
"""
Add node to the list of nodes being provisioned, in order for other RMs
to not try to provision the same one again
"""
self.sfaapi.reserve_resource(host_hrn)
def _get_ip(self, hostname):
"""
Query PLCAPI for the IP of a node with certain node id
"""
try:
ip = sshfuncs.gethostbyname(hostname)
except:
# Fail while trying to find the IP
return None
return ip
def fail_discovery(self):
msg = "Discovery failed. No candidates found for node"
self.error(msg)
raise RuntimeError, msg
def fail_node_not_alive(self, hostname=None):
msg = "Node %s not alive" % hostname
raise RuntimeError, msg
def fail_node_not_available(self, hostname):
msg = "Node %s not available for provisioning" % hostname
raise RuntimeError, msg
def fail_not_enough_nodes(self):
msg = "Not enough nodes available for provisioning"
raise RuntimeError, msg
def fail_plapi(self):
msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
" attributes pluser and plpassword."
raise RuntimeError, msg
def valid_connection(self, guid):
# TODO: Validate!
return True