Source code for nepi.resources.planetlab.sfa_node

#
#    NEPI, a framework to manage network experiments
#    Copyright (C) 2013 INRIA
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License version 2 as
#    published by the Free Software Foundation;
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>

from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import ResourceManager, clsinit_copy, \
        ResourceState
from nepi.resources.linux.node import LinuxNode
from nepi.util.sfaapi import SFAAPIFactory 
from nepi.util.execfuncs import lexec
from nepi.util import sshfuncs

from random import randint
import re
import os
import weakref
import time
import socket
import threading
import datetime

@clsinit_copy
[docs]class PlanetlabSfaNode(LinuxNode): _rtype = "planetlab::sfa::Node" _help = "Controls a PlanetLab host accessible using a SSH key " \ "and provisioned using SFA" _platform = "planetlab" @classmethod def _register_attributes(cls): sfa_user = Attribute("sfauser", "SFA user", flags = Flags.Credential) sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \ used to generate the user credential") city = Attribute("city", "Constrain location (city) during resource \ discovery. May use wildcards.", flags = Flags.Filter) country = Attribute("country", "Constrain location (country) during \ resource discovery. May use wildcards.", flags = Flags.Filter) region = Attribute("region", "Constrain location (region) during \ resource discovery. May use wildcards.", flags = Flags.Filter) architecture = Attribute("architecture", "Constrain architecture \ during resource discovery.", type = Types.Enumerate, allowed = ["x86_64", "i386"], flags = Flags.Filter) operating_system = Attribute("operatingSystem", "Constrain operating \ system during resource discovery.", type = Types.Enumerate, allowed = ["f8", "f12", "f14", "centos", "other"], flags = Flags.Filter) min_reliability = Attribute("minReliability", "Constrain reliability \ while picking PlanetLab nodes. Specifies a lower \ acceptable bound.", type = Types.Double, range = (1, 100), flags = Flags.Filter) max_reliability = Attribute("maxReliability", "Constrain reliability \ while picking PlanetLab nodes. Specifies an upper \ acceptable bound.", type = Types.Double, range = (1, 100), flags = Flags.Filter) min_bandwidth = Attribute("minBandwidth", "Constrain available \ bandwidth while picking PlanetLab nodes. \ Specifies a lower acceptable bound.", type = Types.Double, range = (0, 2**31), flags = Flags.Filter) max_bandwidth = Attribute("maxBandwidth", "Constrain available \ bandwidth while picking PlanetLab nodes. \ Specifies an upper acceptable bound.", type = Types.Double, range = (0, 2**31), flags = Flags.Filter) min_load = Attribute("minLoad", "Constrain node load average while \ picking PlanetLab nodes. Specifies a lower acceptable \ bound.", type = Types.Double, range = (0, 2**31), flags = Flags.Filter) max_load = Attribute("maxLoad", "Constrain node load average while \ picking PlanetLab nodes. Specifies an upper acceptable \ bound.", type = Types.Double, range = (0, 2**31), flags = Flags.Filter) min_cpu = Attribute("minCpu", "Constrain available cpu time while \ picking PlanetLab nodes. Specifies a lower acceptable \ bound.", type = Types.Double, range = (0, 100), flags = Flags.Filter) max_cpu = Attribute("maxCpu", "Constrain available cpu time while \ picking PlanetLab nodes. Specifies an upper acceptable \ bound.", type = Types.Double, range = (0, 100), flags = Flags.Filter) timeframe = Attribute("timeframe", "Past time period in which to check\ information about the node. Values are year,month, \ week, latest", default = "week", type = Types.Enumerate, allowed = ["latest", "week", "month", "year"], flags = Flags.Filter) plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \ in the user's home directory under .nepi directory. This file \ contains a list of PL nodes to blacklist, and at the end \ of the experiment execution the new blacklisted nodes are added.", type = Types.Bool, default = False, flags = Flags.Global) cls._register_attribute(sfa_user) cls._register_attribute(sfa_private_key) cls._register_attribute(city) cls._register_attribute(country) cls._register_attribute(region) cls._register_attribute(architecture) cls._register_attribute(operating_system) cls._register_attribute(min_reliability) cls._register_attribute(max_reliability) cls._register_attribute(min_bandwidth) cls._register_attribute(max_bandwidth) cls._register_attribute(min_load) cls._register_attribute(max_load) cls._register_attribute(min_cpu) cls._register_attribute(max_cpu) cls._register_attribute(timeframe) cls._register_attribute(plblacklist) def __init__(self, ec, guid): super(PlanetlabSfaNode, self).__init__(ec, guid) self._ecobj = weakref.ref(ec) self._sfaapi = None self._node_to_provision = None self._slicenode = False self._hostname = False if self.get("gateway") or self.get("gatewayUser"): self.set("gateway", None) self.set("gatewayUser", None) # Blacklist file for PL nodes nepi_home = os.path.join(os.path.expanduser("~"), ".nepi") plblacklist_file = os.path.join(nepi_home, "plblacklist.txt") if not os.path.exists(plblacklist_file): if os.path.isdir(nepi_home): open(plblacklist_file, 'w').close() else: os.makedirs(nepi_home) open(plblacklist_file, 'w').close() def _skip_provision(self): sfa_user = self.get("sfauser") if not sfa_user: return True else: return False @property
[docs] def sfaapi(self): """ Property to instanciate the SFA API based in sfi client. For each SFA method called this instance is used. """ if not self._sfaapi: sfa_user = self.get("sfauser") sfa_sm = "http://sfa3.planet-lab.eu:12346/" sfa_auth = '.'.join(sfa_user.split('.')[:2]) sfa_registry = "http://sfa3.planet-lab.eu:12345/" sfa_private_key = self.get("sfaPrivateKey") _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, sfa_registry, sfa_sm, sfa_private_key, self._ecobj()) if not _sfaapi: self.fail_sfaapi() self._sfaapi = weakref.ref(_sfaapi) return self._sfaapi()
[docs] def do_discover(self): """ Based on the attributes defined by the user, discover the suitable nodes for provision. """ if self._skip_provision(): super(PlanetlabSfaNode, self).do_discover() return nodes = self.sfaapi.get_resources_hrn() hostname = self._get_hostname() if hostname: # the user specified one particular node to be provisioned self._hostname = True host_hrn = nodes[hostname] # check that the node is not blacklisted or being provisioned # by other RM if not self._blacklisted(host_hrn) and not self._reserved(host_hrn): # Node in reservation ping_ok = self._do_ping(hostname) if not ping_ok: self._blacklist_node(host_hrn) self.fail_node_not_alive(hostname) else: if self._check_if_in_slice([host_hrn]): self.debug("The node %s is already in the slice" % hostname) self._slicenode = True self._node_to_provision = host_hrn else: self.fail_node_not_available(hostname) super(PlanetlabSfaNode, self).do_discover() else: hosts_hrn = nodes.values() nodes_inslice = self._check_if_in_slice(hosts_hrn) nodes_not_inslice = list(set(hosts_hrn) - set(nodes_inslice)) host_hrn = None if nodes_inslice: host_hrn = self._choose_random_node(nodes, nodes_inslice) self._slicenode = True if not host_hrn: # Either there were no matching nodes in the user's slice, or # the nodes in the slice were blacklisted or being provisioned # by other RM. Note nodes_not_inslice is never empty host_hrn = self._choose_random_node(nodes, nodes_not_inslice) self._slicenode = False if host_hrn: self._node_to_provision = host_hrn try: self._set_hostname_attr(host_hrn) self.info(" Selected node to provision ") super(PlanetlabSfaNode, self).do_discover() except: self._blacklist_node(host_hrn) self.do_discover() else: self.fail_not_enough_nodes()
def _blacklisted(self, host_hrn): """ Check in the SFA API that the node is not in the blacklist. """ if self.sfaapi.blacklisted(host_hrn): return True return False def _reserved(self, host_hrn): """ Check in the SFA API that the node is not in the reserved list. """ if self.sfaapi.reserved(host_hrn): return True return False
[docs] def do_provision(self): """ Add node to user's slice and verifing that the node is functioning correctly. Check ssh, file system. """ if self._skip_provision(): super(PlanetlabSfaNode, self).do_provision() return provision_ok = False ssh_ok = False proc_ok = False timeout = 1800 while not provision_ok: node = self._node_to_provision if not self._slicenode: self._add_node_to_slice(node) # check ssh connection t = 0 while t < timeout and not ssh_ok: cmd = 'echo \'GOOD NODE\'' ((out, err), proc) = self.execute(cmd) if out.find("GOOD NODE") < 0: self.debug( "No SSH connection, waiting 60s" ) t = t + 60 time.sleep(60) continue else: self.debug( "SSH OK" ) ssh_ok = True continue else: cmd = 'echo \'GOOD NODE\'' ((out, err), proc) = self.execute(cmd) if not out.find("GOOD NODE") < 0: ssh_ok = True if not ssh_ok: # the timeout was reach without establishing ssh connection # the node is blacklisted, deleted from the slice, and a new # node to provision is discovered self.warning(" Could not SSH login ") self._blacklist_node(node) self.do_discover() continue # check /proc directory is mounted (ssh_ok = True) # and file system is not read only else: cmd = 'mount |grep proc' ((out1, err1), proc1) = self.execute(cmd) cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile' ((out2, err2), proc2) = self.execute(cmd) if out1.find("/proc type proc") < 0 or \ "Read-only file system".lower() in err2.lower(): self.warning(" Corrupted file system ") self._blacklist_node(node) self.do_discover() continue else: provision_ok = True if not self.get('hostname'): self._set_hostname_attr(node) self.info(" Node provisioned ") super(PlanetlabSfaNode, self).do_provision()
[docs] def do_release(self): super(PlanetlabSfaNode, self).do_release() if self.state == ResourceState.RELEASED and not self._skip_provision(): self.debug(" Releasing SFA API ") self.sfaapi.release() # def _filter_based_on_attributes(self): # """ # Retrive the list of nodes hrn that match user's constraints # """ # # Map user's defined attributes with tagnames of PlanetLab # timeframe = self.get("timeframe")[0] # attr_to_tags = { # 'city' : 'city', # 'country' : 'country', # 'region' : 'region', # 'architecture' : 'arch', # 'operatingSystem' : 'fcdistro', # 'minReliability' : 'reliability%s' % timeframe, # 'maxReliability' : 'reliability%s' % timeframe, # 'minBandwidth' : 'bw%s' % timeframe, # 'maxBandwidth' : 'bw%s' % timeframe, # 'minLoad' : 'load%s' % timeframe, # 'maxLoad' : 'load%s' % timeframe, # 'minCpu' : 'cpu%s' % timeframe, # 'maxCpu' : 'cpu%s' % timeframe, # } # # nodes_hrn = [] # filters = {} # # for attr_name, attr_obj in self._attrs.iteritems(): # attr_value = self.get(attr_name) # # if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \ # attr_name != 'timeframe': # # attr_tag = attr_to_tags[attr_name] # filters['tagname'] = attr_tag # # # filter nodes by fixed constraints e.g. operating system # if not 'min' in attr_name and not 'max' in attr_name: # filters['value'] = attr_value # nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn) # # # filter nodes by range constraints e.g. max bandwidth # elif ('min' or 'max') in attr_name: # nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn) # # if not filters: # nodes = self.sfaapi.get_resources_hrn() # for node in nodes: # nodes_hrn.append(node[node.key()]) # return nodes_hrn # # def _filter_by_fixed_attr(self, filters, nodes_hrn): # """ # Query SFA API for nodes matching fixed attributes defined by the # user # """ # pass ## node_tags = self.sfaapi.get_resources_tags(filters) ## if node_tags is not None: ## ## if len(nodes_id) == 0: ## # first attribute being matched ## for node_tag in node_tags: ## nodes_id.append(node_tag['node_id']) ## else: ## # remove the nodes ids that don't match the new attribute ## # that is being match ## ## nodes_id_tmp = [] ## for node_tag in node_tags: ## if node_tag['node_id'] in nodes_id: ## nodes_id_tmp.append(node_tag['node_id']) ## ## if len(nodes_id_tmp): ## nodes_id = set(nodes_id) & set(nodes_id_tmp) ## else: ## # no node from before match the new constraint ## self.fail_discovery() ## else: ## # no nodes match the filter applied ## self.fail_discovery() ## ## return nodes_id # # def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id): # """ # Query PLCAPI for nodes ids matching attributes defined in a certain # range, by the user # """ # pass ## node_tags = self.plapi.get_node_tags(filters) ## if node_tags: ## ## if len(nodes_id) == 0: ## # first attribute being matched ## for node_tag in node_tags: ## ## # check that matches the min or max restriction ## if 'min' in attr_name and node_tag['value'] != 'n/a' and \ ## float(node_tag['value']) > attr_value: ## nodes_id.append(node_tag['node_id']) ## ## elif 'max' in attr_name and node_tag['value'] != 'n/a' and \ ## float(node_tag['value']) < attr_value: ## nodes_id.append(node_tag['node_id']) ## else: ## ## # remove the nodes ids that don't match the new attribute ## # that is being match ## nodes_id_tmp = [] ## for node_tag in node_tags: ## ## # check that matches the min or max restriction and was a ## # matching previous filters ## if 'min' in attr_name and node_tag['value'] != 'n/a' and \ ## float(node_tag['value']) > attr_value and \ ## node_tag['node_id'] in nodes_id: ## nodes_id_tmp.append(node_tag['node_id']) ## ## elif 'max' in attr_name and node_tag['value'] != 'n/a' and \ ## float(node_tag['value']) < attr_value and \ ## node_tag['node_id'] in nodes_id: ## nodes_id_tmp.append(node_tag['node_id']) ## ## if len(nodes_id_tmp): ## nodes_id = set(nodes_id) & set(nodes_id_tmp) ## else: ## # no node from before match the new constraint ## self.fail_discovery() ## ## else: #TODO CHECK ## # no nodes match the filter applied ## self.fail_discovery() ## ## return nodes_id
def _choose_random_node(self, nodes, hosts_hrn): """ From the possible nodes for provision, choose randomly to decrese the probability of different RMs choosing the same node for provision """ size = len(hosts_hrn) while size: size = size - 1 index = randint(0, size) host_hrn = hosts_hrn[index] hosts_hrn[index] = hosts_hrn[size] # check the node is not blacklisted or being provision by other RM # and perform ping to check that is really alive if not self._blacklisted(host_hrn): if not self._reserved(host_hrn): print self.sfaapi._reserved ,self.guid for hostname, hrn in nodes.iteritems(): if host_hrn == hrn: print 'hostname' ,hostname ping_ok = self._do_ping(hostname) if not ping_ok: self._set_hostname_attr(hostname) self.warning(" Node not responding PING ") self._blacklist_node(host_hrn) else: # discovered node for provision, added to provision list self._node_to_provision = host_hrn return host_hrn # def _get_nodes_id(self, filters=None): # return self.plapi.get_nodes(filters, fields=['node_id']) # def _add_node_to_slice(self, host_hrn): """ Add node to slice, using SFA API. """ self.info(" Adding node to slice ") slicename = self.get("username").replace('_', '.') slicename = 'ple.' + slicename self.sfaapi.add_resource_to_slice(slicename, host_hrn) def _delete_from_slice(self): """ Delete every node from slice, using SFA API. Sfi client doesn't work for particular node urns. """ self.warning(" Deleting node from slice ") slicename = self.get("username").replace('_', '.') slicename = 'ple.' + slicename self.sfaapi.remove_all_from_slice(slicename) def _get_hostname(self): """ Get the attribute hostname. """ hostname = self.get("hostname") if hostname: return hostname else: return None def _set_hostname_attr(self, node): """ Query SFAAPI for the hostname of a certain host hrn and sets the attribute hostname, it will over write the previous value. """ hosts_hrn = self.sfaapi.get_resources_hrn() for hostname, hrn in hosts_hrn.iteritems(): if hrn == node: self.set("hostname", hostname) def _check_if_in_slice(self, hosts_hrn): """ Check using SFA API if any host hrn from hosts_hrn is in the user's slice. """ slicename = self.get("username").replace('_', '.') slicename = 'ple.' + slicename slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource'] if slice_nodes: slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values() else: slice_nodes_hrn = [] nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn)) return nodes_inslice def _do_ping(self, hostname): """ Perform ping command on node's IP matching hostname. """ ping_ok = False ip = self._get_ip(hostname) if ip: command = "ping -c4 %s" % ip (out, err) = lexec(command) m = re.search("(\d+)% packet loss", str(out)) if m and int(m.groups()[0]) < 50: ping_ok = True return ping_ok def _blacklist_node(self, host_hrn): """ Add mal functioning node to blacklist (in SFA API). """ self.warning(" Blacklisting malfunctioning node ") self.sfaapi.blacklist_resource(host_hrn) if not self._hostname: self.set('hostname', None) def _reserve(self, host_hrn): """ Add node to the list of nodes being provisioned, in order for other RMs to not try to provision the same one again. """ self.sfaapi.reserve_resource(host_hrn) def _get_ip(self, hostname): """ Query cache for the IP of a node with certain hostname """ try: ip = sshfuncs.gethostbyname(hostname) except: # Fail while trying to find the IP return None return ip
[docs] def fail_discovery(self): msg = "Discovery failed. No candidates found for node" self.error(msg) raise RuntimeError, msg
[docs] def fail_node_not_alive(self, hostname=None): msg = "Node %s not alive" % hostname raise RuntimeError, msg
[docs] def fail_node_not_available(self, hostname): msg = "Node %s not available for provisioning" % hostname raise RuntimeError, msg
[docs] def fail_not_enough_nodes(self): msg = "Not enough nodes available for provisioning" raise RuntimeError, msg
[docs] def fail_sfaapi(self): msg = "Failing while trying to instanciate the SFA API.\nSet the" + \ " attributes sfauser and sfaPrivateKey." raise RuntimeError, msg
[docs] def valid_connection(self, guid): # TODO: Validate! return True