--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation;
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
+
+from __future__ import print_function
+
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState
+from nepi.resources.linux.node import LinuxNode
+from nepi.util.sfaapi import SFAAPIFactory
+from nepi.util.execfuncs import lexec
+from nepi.util import sshfuncs
+
+from random import randint
+import re
+import os
+import weakref
+import time
+import socket
+import threading
+import datetime
+
+@clsinit_copy
+class PlanetlabSfaNode(LinuxNode):
+ _rtype = "planetlab::sfa::Node"
+ _help = "Controls a PlanetLab host accessible using a SSH key " \
+ "and provisioned using SFA"
+ _platform = "planetlab"
+
+ @classmethod
+ def _register_attributes(cls):
+
+ sfa_user = Attribute("sfauser", "SFA user",
+ flags = Flags.Credential)
+
+ sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
+ used to generate the user credential")
+
+ city = Attribute("city", "Constrain location (city) during resource \
+ discovery. May use wildcards.",
+ flags = Flags.Filter)
+
+ country = Attribute("country", "Constrain location (country) during \
+ resource discovery. May use wildcards.",
+ flags = Flags.Filter)
+
+ region = Attribute("region", "Constrain location (region) during \
+ resource discovery. May use wildcards.",
+ flags = Flags.Filter)
+
+ architecture = Attribute("architecture", "Constrain architecture \
+ during resource discovery.",
+ type = Types.Enumerate,
+ allowed = ["x86_64",
+ "i386"],
+ flags = Flags.Filter)
+
+ operating_system = Attribute("operatingSystem", "Constrain operating \
+ system during resource discovery.",
+ type = Types.Enumerate,
+ allowed = ["f8",
+ "f12",
+ "f14",
+ "centos",
+ "other"],
+ flags = Flags.Filter)
+
+ min_reliability = Attribute("minReliability", "Constrain reliability \
+ while picking PlanetLab nodes. Specifies a lower \
+ acceptable bound.",
+ type = Types.Double,
+ range = (1, 100),
+ flags = Flags.Filter)
+
+ max_reliability = Attribute("maxReliability", "Constrain reliability \
+ while picking PlanetLab nodes. Specifies an upper \
+ acceptable bound.",
+ type = Types.Double,
+ range = (1, 100),
+ flags = Flags.Filter)
+
+ min_bandwidth = Attribute("minBandwidth", "Constrain available \
+ bandwidth while picking PlanetLab nodes. \
+ Specifies a lower acceptable bound.",
+ type = Types.Double,
+ range = (0, 2**31),
+ flags = Flags.Filter)
+
+ max_bandwidth = Attribute("maxBandwidth", "Constrain available \
+ bandwidth while picking PlanetLab nodes. \
+ Specifies an upper acceptable bound.",
+ type = Types.Double,
+ range = (0, 2**31),
+ flags = Flags.Filter)
+
+ min_load = Attribute("minLoad", "Constrain node load average while \
+ picking PlanetLab nodes. Specifies a lower acceptable \
+ bound.",
+ type = Types.Double,
+ range = (0, 2**31),
+ flags = Flags.Filter)
+
+ max_load = Attribute("maxLoad", "Constrain node load average while \
+ picking PlanetLab nodes. Specifies an upper acceptable \
+ bound.",
+ type = Types.Double,
+ range = (0, 2**31),
+ flags = Flags.Filter)
+
+ min_cpu = Attribute("minCpu", "Constrain available cpu time while \
+ picking PlanetLab nodes. Specifies a lower acceptable \
+ bound.",
+ type = Types.Double,
+ range = (0, 100),
+ flags = Flags.Filter)
+
+ max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
+ picking PlanetLab nodes. Specifies an upper acceptable \
+ bound.",
+ type = Types.Double,
+ range = (0, 100),
+ flags = Flags.Filter)
+
+ timeframe = Attribute("timeframe", "Past time period in which to check\
+ information about the node. Values are year,month, \
+ week, latest",
+ default = "week",
+ type = Types.Enumerate,
+ allowed = ["latest",
+ "week",
+ "month",
+ "year"],
+ flags = Flags.Filter)
+
+ plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
+ in the user's home directory under .nepi directory. This file \
+ contains a list of PL nodes to blacklist, and at the end \
+ of the experiment execution the new blacklisted nodes are added.",
+ type = Types.Bool,
+ default = False,
+ flags = Flags.Global)
+
+ cls._register_attribute(sfa_user)
+ cls._register_attribute(sfa_private_key)
+ cls._register_attribute(city)
+ cls._register_attribute(country)
+ cls._register_attribute(region)
+ cls._register_attribute(architecture)
+ cls._register_attribute(operating_system)
+ cls._register_attribute(min_reliability)
+ cls._register_attribute(max_reliability)
+ cls._register_attribute(min_bandwidth)
+ cls._register_attribute(max_bandwidth)
+ cls._register_attribute(min_load)
+ cls._register_attribute(max_load)
+ cls._register_attribute(min_cpu)
+ cls._register_attribute(max_cpu)
+ cls._register_attribute(timeframe)
+ cls._register_attribute(plblacklist)
+
+ def __init__(self, ec, guid):
+ super(PlanetlabSfaNode, self).__init__(ec, guid)
+
+ self._ecobj = weakref.ref(ec)
+ self._sfaapi = None
+ self._node_to_provision = None
+ self._slicenode = False
+ self._hostname = False
+
+ if self.get("gateway") or self.get("gatewayUser"):
+ self.set("gateway", None)
+ self.set("gatewayUser", None)
+
+ # Blacklist file for PL nodes
+ nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
+ plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
+ if not os.path.exists(plblacklist_file):
+ if os.path.isdir(nepi_home):
+ with open(plblacklist_file, 'w') as clear:
+ pass
+ else:
+ os.makedirs(nepi_home)
+ with open(plblacklist_file, 'w') as clear:
+ pass
+
+ def _skip_provision(self):
+ sfa_user = self.get("sfauser")
+ if not sfa_user:
+ return True
+ else: return False
+
+ @property
+ def sfaapi(self):
+ """
+ Property to instanciate the SFA API based in sfi client.
+ For each SFA method called this instance is used.
+ """
+ if not self._sfaapi:
+ sfa_user = self.get("sfauser")
+ sfa_sm = "http://sfa3.planet-lab.eu:12346/"
+ sfa_auth = '.'.join(sfa_user.split('.')[:2])
+ sfa_registry = "http://sfa3.planet-lab.eu:12345/"
+ sfa_private_key = self.get("sfaPrivateKey")
+
+ _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth,
+ sfa_registry, sfa_sm, sfa_private_key, self._ecobj())
+
+ if not _sfaapi:
+ self.fail_sfaapi()
+
+ self._sfaapi = weakref.ref(_sfaapi)
+
+ return self._sfaapi()
+
+ def do_discover(self):
+ """
+ Based on the attributes defined by the user, discover the suitable
+ nodes for provision.
+ """
+ if self._skip_provision():
+ super(PlanetlabSfaNode, self).do_discover()
+ return
+
+ nodes = self.sfaapi.get_resources_hrn()
+
+ hostname = self._get_hostname()
+ if hostname:
+ # the user specified one particular node to be provisioned
+ self._hostname = True
+ host_hrn = nodes[hostname]
+
+ # check that the node is not blacklisted or being provisioned
+ # by other RM
+ if not self._blacklisted(host_hrn) and not self._reserved(host_hrn):
+ # Node in reservation
+ ping_ok = self._do_ping(hostname)
+ if not ping_ok:
+ self._blacklist_node(host_hrn)
+ self.fail_node_not_alive(hostname)
+ else:
+ if self._check_if_in_slice([host_hrn]):
+ self.debug("The node %s is already in the slice" % hostname)
+ self._slicenode = True
+ self._node_to_provision = host_hrn
+ else:
+ self.fail_node_not_available(hostname)
+ super(PlanetlabSfaNode, self).do_discover()
+
+ else:
+ hosts_hrn = nodes.values()
+ nodes_inslice = self._check_if_in_slice(hosts_hrn)
+ nodes_not_inslice = list(set(hosts_hrn) - set(nodes_inslice))
+ host_hrn = None
+ if nodes_inslice:
+ host_hrn = self._choose_random_node(nodes, nodes_inslice)
+ self._slicenode = True
+
+ if not host_hrn:
+ # Either there were no matching nodes in the user's slice, or
+ # the nodes in the slice were blacklisted or being provisioned
+ # by other RM. Note nodes_not_inslice is never empty
+ host_hrn = self._choose_random_node(nodes, nodes_not_inslice)
+ self._slicenode = False
+
+ if host_hrn:
+ self._node_to_provision = host_hrn
+ try:
+ self._set_hostname_attr(host_hrn)
+ self.info(" Selected node to provision ")
+ super(PlanetlabSfaNode, self).do_discover()
+ except:
+ self._blacklist_node(host_hrn)
+ self.do_discover()
+ else:
+ self.fail_not_enough_nodes()
+
+ def _blacklisted(self, host_hrn):
+ """
+ Check in the SFA API that the node is not in the blacklist.
+ """
+ if self.sfaapi.blacklisted(host_hrn):
+ return True
+ return False
+
+ def _reserved(self, host_hrn):
+ """
+ Check in the SFA API that the node is not in the reserved
+ list.
+ """
+ if self.sfaapi.reserved(host_hrn):
+ return True
+ return False
+
+ def do_provision(self):
+ """
+ Add node to user's slice and verifing that the node is functioning
+ correctly. Check ssh, file system.
+ """
+ if self._skip_provision():
+ super(PlanetlabSfaNode, self).do_provision()
+ return
+
+ provision_ok = False
+ ssh_ok = False
+ proc_ok = False
+ timeout = 1800
+
+ while not provision_ok:
+ node = self._node_to_provision
+ if not self._slicenode:
+ self._add_node_to_slice(node)
+
+ # check ssh connection
+ t = 0
+ while t < timeout and not ssh_ok:
+
+ cmd = 'echo \'GOOD NODE\''
+ ((out, err), proc) = self.execute(cmd)
+ if out.find("GOOD NODE") < 0:
+ self.debug( "No SSH connection, waiting 60s" )
+ t = t + 60
+ time.sleep(60)
+ continue
+ else:
+ self.debug( "SSH OK" )
+ ssh_ok = True
+ continue
+ else:
+ cmd = 'echo \'GOOD NODE\''
+ ((out, err), proc) = self.execute(cmd)
+ if not out.find("GOOD NODE") < 0:
+ ssh_ok = True
+
+ if not ssh_ok:
+ # the timeout was reach without establishing ssh connection
+ # the node is blacklisted, deleted from the slice, and a new
+ # node to provision is discovered
+ self.warning(" Could not SSH login ")
+ self._blacklist_node(node)
+ self.do_discover()
+ continue
+
+ # check /proc directory is mounted (ssh_ok = True)
+ # and file system is not read only
+ else:
+ cmd = 'mount |grep proc'
+ ((out1, err1), proc1) = self.execute(cmd)
+ cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
+ ((out2, err2), proc2) = self.execute(cmd)
+ if out1.find("/proc type proc") < 0 or \
+ "Read-only file system".lower() in err2.lower():
+ self.warning(" Corrupted file system ")
+ self._blacklist_node(node)
+ self.do_discover()
+ continue
+
+ else:
+ provision_ok = True
+ if not self.get('hostname'):
+ self._set_hostname_attr(node)
+ self.info(" Node provisioned ")
+
+ super(PlanetlabSfaNode, self).do_provision()
+
+ def do_release(self):
+ super(PlanetlabSfaNode, self).do_release()
+ if self.state == ResourceState.RELEASED and not self._skip_provision():
+ self.debug(" Releasing SFA API ")
+ self.sfaapi.release()
+
+# def _filter_based_on_attributes(self):
+# """
+# Retrive the list of nodes hrn that match user's constraints
+# """
+# # Map user's defined attributes with tagnames of PlanetLab
+# timeframe = self.get("timeframe")[0]
+# attr_to_tags = {
+# 'city' : 'city',
+# 'country' : 'country',
+# 'region' : 'region',
+# 'architecture' : 'arch',
+# 'operatingSystem' : 'fcdistro',
+# 'minReliability' : 'reliability%s' % timeframe,
+# 'maxReliability' : 'reliability%s' % timeframe,
+# 'minBandwidth' : 'bw%s' % timeframe,
+# 'maxBandwidth' : 'bw%s' % timeframe,
+# 'minLoad' : 'load%s' % timeframe,
+# 'maxLoad' : 'load%s' % timeframe,
+# 'minCpu' : 'cpu%s' % timeframe,
+# 'maxCpu' : 'cpu%s' % timeframe,
+# }
+#
+# nodes_hrn = []
+# filters = {}
+#
+# for attr_name, attr_obj in self._attrs.iteritems():
+# attr_value = self.get(attr_name)
+#
+# if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
+# attr_name != 'timeframe':
+#
+# attr_tag = attr_to_tags[attr_name]
+# filters['tagname'] = attr_tag
+#
+# # filter nodes by fixed constraints e.g. operating system
+# if not 'min' in attr_name and not 'max' in attr_name:
+# filters['value'] = attr_value
+# nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
+#
+# # filter nodes by range constraints e.g. max bandwidth
+# elif ('min' or 'max') in attr_name:
+# nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
+#
+# if not filters:
+# nodes = self.sfaapi.get_resources_hrn()
+# for node in nodes:
+# nodes_hrn.append(node[node.key()])
+# return nodes_hrn
+#
+# def _filter_by_fixed_attr(self, filters, nodes_hrn):
+# """
+# Query SFA API for nodes matching fixed attributes defined by the
+# user
+# """
+# pass
+## node_tags = self.sfaapi.get_resources_tags(filters)
+## if node_tags is not None:
+##
+## if len(nodes_id) == 0:
+## # first attribute being matched
+## for node_tag in node_tags:
+## nodes_id.append(node_tag['node_id'])
+## else:
+## # remove the nodes ids that don't match the new attribute
+## # that is being match
+##
+## nodes_id_tmp = []
+## for node_tag in node_tags:
+## if node_tag['node_id'] in nodes_id:
+## nodes_id_tmp.append(node_tag['node_id'])
+##
+## if len(nodes_id_tmp):
+## nodes_id = set(nodes_id) & set(nodes_id_tmp)
+## else:
+## # no node from before match the new constraint
+## self.fail_discovery()
+## else:
+## # no nodes match the filter applied
+## self.fail_discovery()
+##
+## return nodes_id
+#
+# def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
+# """
+# Query PLCAPI for nodes ids matching attributes defined in a certain
+# range, by the user
+# """
+# pass
+## node_tags = self.plapi.get_node_tags(filters)
+## if node_tags:
+##
+## if len(nodes_id) == 0:
+## # first attribute being matched
+## for node_tag in node_tags:
+##
+## # check that matches the min or max restriction
+## if 'min' in attr_name and node_tag['value'] != 'n/a' and \
+## float(node_tag['value']) > attr_value:
+## nodes_id.append(node_tag['node_id'])
+##
+## elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
+## float(node_tag['value']) < attr_value:
+## nodes_id.append(node_tag['node_id'])
+## else:
+##
+## # remove the nodes ids that don't match the new attribute
+## # that is being match
+## nodes_id_tmp = []
+## for node_tag in node_tags:
+##
+## # check that matches the min or max restriction and was a
+## # matching previous filters
+## if 'min' in attr_name and node_tag['value'] != 'n/a' and \
+## float(node_tag['value']) > attr_value and \
+## node_tag['node_id'] in nodes_id:
+## nodes_id_tmp.append(node_tag['node_id'])
+##
+## elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
+## float(node_tag['value']) < attr_value and \
+## node_tag['node_id'] in nodes_id:
+## nodes_id_tmp.append(node_tag['node_id'])
+##
+## if len(nodes_id_tmp):
+## nodes_id = set(nodes_id) & set(nodes_id_tmp)
+## else:
+## # no node from before match the new constraint
+## self.fail_discovery()
+##
+## else: #TODO CHECK
+## # no nodes match the filter applied
+## self.fail_discovery()
+##
+## return nodes_id
+
+ def _choose_random_node(self, nodes, hosts_hrn):
+ """
+ From the possible nodes for provision, choose randomly to decrese the
+ probability of different RMs choosing the same node for provision
+ """
+ size = len(hosts_hrn)
+ while size:
+ size = size - 1
+ index = randint(0, size)
+ host_hrn = hosts_hrn[index]
+ hosts_hrn[index] = hosts_hrn[size]
+
+ # check the node is not blacklisted or being provision by other RM
+ # and perform ping to check that is really alive
+ if not self._blacklisted(host_hrn):
+ if not self._reserved(host_hrn):
+ print(self.sfaapi._reserved ,self.guid)
+ for hostname, hrn in nodes.items():
+ if host_hrn == hrn:
+ print('hostname' ,hostname)
+ ping_ok = self._do_ping(hostname)
+
+ if not ping_ok:
+ self._set_hostname_attr(hostname)
+ self.warning(" Node not responding PING ")
+ self._blacklist_node(host_hrn)
+ else:
+ # discovered node for provision, added to provision list
+ self._node_to_provision = host_hrn
+ return host_hrn
+
+# def _get_nodes_id(self, filters=None):
+# return self.plapi.get_nodes(filters, fields=['node_id'])
+#
+ def _add_node_to_slice(self, host_hrn):
+ """
+ Add node to slice, using SFA API.
+ """
+ self.info(" Adding node to slice ")
+ slicename = self.get("username").replace('_', '.')
+ slicename = 'ple.' + slicename
+ self.sfaapi.add_resource_to_slice(slicename, host_hrn)
+
+ def _delete_from_slice(self):
+ """
+ Delete every node from slice, using SFA API.
+ Sfi client doesn't work for particular node urns.
+ """
+ self.warning(" Deleting node from slice ")
+ slicename = self.get("username").replace('_', '.')
+ slicename = 'ple.' + slicename
+ self.sfaapi.remove_all_from_slice(slicename)
+
+ def _get_hostname(self):
+ """
+ Get the attribute hostname.
+ """
+ hostname = self.get("hostname")
+ if hostname:
+ return hostname
+ else:
+ return None
+
+ def _set_hostname_attr(self, node):
+ """
+ Query SFAAPI for the hostname of a certain host hrn and sets the
+ attribute hostname, it will over write the previous value.
+ """
+ hosts_hrn = self.sfaapi.get_resources_hrn()
+ for hostname, hrn in hosts_hrn.items():
+ if hrn == node:
+ self.set("hostname", hostname)
+
+ def _check_if_in_slice(self, hosts_hrn):
+ """
+ Check using SFA API if any host hrn from hosts_hrn is in the user's
+ slice.
+ """
+ slicename = self.get("username").replace('_', '.')
+ slicename = 'ple.' + slicename
+ slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
+ if slice_nodes:
+ slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
+ else: slice_nodes_hrn = []
+ nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
+ return nodes_inslice
+
+ def _do_ping(self, hostname):
+ """
+ Perform ping command on node's IP matching hostname.
+ """
+ ping_ok = False
+ ip = self._get_ip(hostname)
+ if ip:
+ command = "ping -c4 %s" % ip
+ (out, err) = lexec(command)
+
+ m = re.search("(\d+)% packet loss", str(out))
+ if m and int(m.groups()[0]) < 50:
+ ping_ok = True
+
+ return ping_ok
+
+ def _blacklist_node(self, host_hrn):
+ """
+ Add mal functioning node to blacklist (in SFA API).
+ """
+ self.warning(" Blacklisting malfunctioning node ")
+ self.sfaapi.blacklist_resource(host_hrn)
+ if not self._hostname:
+ self.set('hostname', None)
+
+ def _reserve(self, host_hrn):
+ """
+ Add node to the list of nodes being provisioned, in order for other RMs
+ to not try to provision the same one again.
+ """
+ self.sfaapi.reserve_resource(host_hrn)
+
+ def _get_ip(self, hostname):
+ """
+ Query cache for the IP of a node with certain hostname
+ """
+ try:
+ ip = sshfuncs.gethostbyname(hostname)
+ except:
+ # Fail while trying to find the IP
+ return None
+ return ip
+
+ def fail_discovery(self):
+ msg = "Discovery failed. No candidates found for node"
+ self.error(msg)
+ raise RuntimeError(msg)
+
+ def fail_node_not_alive(self, hostname=None):
+ msg = "Node %s not alive" % hostname
+ raise RuntimeError(msg)
+
+ def fail_node_not_available(self, hostname):
+ msg = "Node %s not available for provisioning" % hostname
+ raise RuntimeError(msg)
+
+ def fail_not_enough_nodes(self):
+ msg = "Not enough nodes available for provisioning"
+ raise RuntimeError(msg)
+
+ def fail_sfaapi(self):
+ msg = "Failing while trying to instanciate the SFA API.\nSet the" + \
+ " attributes sfauser and sfaPrivateKey."
+ raise RuntimeError(msg)
+
+ def valid_connection(self, guid):
+ # TODO: Validate!
+ return True
+
+