From 071f62fddeabab808850b303249adaaf84747c4c Mon Sep 17 00:00:00 2001 From: Lucia Guevgeozian Odizzio Date: Fri, 21 Mar 2014 11:20:55 +0100 Subject: [PATCH] Adding sfa support ple using hostname --- Makefile | 4 - .../linux/ccn/two_nodes_file_retrieval.py | 30 +- examples/planetlab/sfa.py | 53 ++ src/nepi/resources/linux/ccn/ccnd.py | 4 + src/nepi/resources/linux/ccn/ccnr.py | 1 + src/nepi/resources/linux/node.py | 2 +- src/nepi/resources/planetlab/sfa_node.py | 635 ++++++++++++++++++ src/nepi/util/sfa_api.py | 286 -------- src/nepi/util/sfaapi.py | 258 +++++++ src/nepi/util/sfarspec_proc.py | 197 ++++++ test/lib/test_utils.py | 14 + test/resources/linux/node.py | 5 +- test/resources/planetlab/sfa_node.py | 171 +++++ 13 files changed, 1357 insertions(+), 303 deletions(-) mode change 100644 => 100755 examples/linux/ccn/two_nodes_file_retrieval.py create mode 100755 examples/planetlab/sfa.py create mode 100644 src/nepi/resources/planetlab/sfa_node.py delete mode 100644 src/nepi/util/sfa_api.py create mode 100644 src/nepi/util/sfaapi.py create mode 100644 src/nepi/util/sfarspec_proc.py create mode 100755 test/resources/planetlab/sfa_node.py diff --git a/Makefile b/Makefile index 24e5b158..039820b2 100644 --- a/Makefile +++ b/Makefile @@ -11,11 +11,7 @@ SUBBUILDDIR = $(shell python -c 'import distutils.util, sys; \ PYTHON25 := $(shell python -c 'import sys; v = sys.version_info; \ print (1 if v[0] <= 2 and v[1] <= 5 else 0)') -ifeq ($(PYTHON25),0) -BUILDDIR := $(BUILDDIR)/$(SUBBUILDDIR) -else BUILDDIR := $(BUILDDIR)/lib -endif PYPATH = $(BUILDDIR):$(TESTLIB):$(PYTHONPATH) COVERAGE = $(or $(shell which coverage), $(shell which python-coverage), \ diff --git a/examples/linux/ccn/two_nodes_file_retrieval.py b/examples/linux/ccn/two_nodes_file_retrieval.py old mode 100644 new mode 100755 index 8e40c3bd..1d406d29 --- a/examples/linux/ccn/two_nodes_file_retrieval.py +++ b/examples/linux/ccn/two_nodes_file_retrieval.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python # # NEPI, a framework to manage network experiments # Copyright (C) 2014 INRIA @@ -31,8 +32,9 @@ from nepi.execution.ec import ExperimentController import os -ssh_key = ####### <<< ASSING the absolute path to the private SSH key to login into the remote host >>> -ssh_user = ####### <<< ASSING the SSH username >>> +#ssh_key = ####### <<< ASSING the absolute path to the private SSH key to login into the remote host >>> +#ssh_user = ####### <<< ASSING the SSH username >>> +ssh_user = "icnuser" ## Create the experiment controller ec = ExperimentController(exp_id = "demo_CCN") @@ -40,28 +42,32 @@ ec = ExperimentController(exp_id = "demo_CCN") ## Register node 1 node1 = ec.register_resource("LinuxNode") # Set the hostname of the first node to use for the experiment -hostname1 = "peeramidion.irisa.fr" ##### <<< ASSIGN the hostname of a host you have SSSH access to >>> +#hostname1 = "peeramidion.irisa.fr" ##### <<< ASSIGN the hostname of a host you have SSSH access to >>> +hostname1 = "133.69.33.148" ec.set(node1, "hostname", hostname1) # username should be your SSH user ec.set(node1, "username", ssh_user) # Absolute path to the SSH private key -ec.set(node1, "identity", ssh_key) +#ec.set(node1, "identity", ssh_key) # Clean all files, results, etc, from previous experiments wit the same exp_id -ec.set(node1, "cleanExperiment", True) +#ec.set(node1, "cleanExperiment", True) +ec.set(node1, "cleanHome", True) # Kill all running processes in the node before running the experiment ec.set(node1, "cleanProcesses", True) ## Register node 2 node2 = ec.register_resource("LinuxNode") # Set the hostname of the first node to use for the experiment -hostname2 = "planetlab2.upc.es" ##### <<< ASSIGN the hostname of a host you have SSSH access to >>> +#hostname2 = "planetlab2.upc.es" ##### <<< ASSIGN the hostname of a host you have SSSH access to >>> +hostname2 = "133.69.33.149" ec.set(node2, "hostname", hostname2) # username should be your SSH user ec.set(node2, "username", ssh_user) # Absolute path to the SSH private key -ec.set(node2, "identity", ssh_key) +#ec.set(node2, "identity", ssh_key) # Clean all files, results, etc, from previous experiments wit the same exp_id -ec.set(node2, "cleanExperiment", True) +#ec.set(node2, "cleanExperiment", True) +ec.set(node2, "cleanHome", True) # Kill all running processes in the node before running the experiment ec.set(node2, "cleanProcesses", True) @@ -69,12 +75,14 @@ ec.set(node2, "cleanProcesses", True) ccnd1 = ec.register_resource("LinuxCCND") # Set ccnd log level to 7 ec.set(ccnd1, "debug", 7) +ec.set(ccnd1, "port", 9597) ec.register_connection(ccnd1, node1) ## Register a CCN daemon in node 2 ccnd2 = ec.register_resource("LinuxCCND") # Set ccnd log level to 7 ec.set(ccnd2, "debug", 7) +ec.set(ccnd2, "port", 9597) ec.register_connection(ccnd2, node2) ## Register a repository in node 1 @@ -97,12 +105,14 @@ ec.register_connection(co, ccnr1) # Register a FIB entry from node 1 to node 2 entry1 = ec.register_resource("LinuxFIBEntry") -ec.set(entry1, "host", hostname2) +ec.set(entry1, "host", "10.0.32.2") +ec.set(entry1, "port", 9597) ec.register_connection(entry1, ccnd1) # Register a FIB entry from node 2 to node 1 entry2 = ec.register_resource("LinuxFIBEntry") -ec.set(entry2, "host", hostname1) +ec.set(entry2, "host", "10.0.0.2") +ec.set(entry2, "port", 9597) ec.register_connection(entry2, ccnd2) ## Retrieve the file stored in node 1 from node 2 diff --git a/examples/planetlab/sfa.py b/examples/planetlab/sfa.py new file mode 100755 index 00000000..c5b393e1 --- /dev/null +++ b/examples/planetlab/sfa.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Lucia Guevgeozian + +from nepi.execution.ec import ExperimentController +import os + +# Create the EC +exp_id = "sfa_test" +ec = ExperimentController(exp_id) + +username = os.environ.get('SFA_SLICE') +sfauser = os.environ.get('SFA_USER') +sfaPrivateKey = os.environ.get('SFA_PK') + +# server +node1 = ec.register_resource("PlanetlabSfaNode") +ec.set(node1, "hostname", 'planetlab-4.imperial.ac.uk') +ec.set(node1, "username", username) +ec.set(node1, "sfauser", sfauser) +ec.set(node1, "sfaPrivateKey", sfaPrivateKey) +ec.set(node1, "cleanHome", True) +ec.set(node1, "cleanProcesses", True) + +app1 = ec.register_resource("LinuxApplication") +command = "ping -c5 google.com" +ec.set(app1, "command", command) +ec.register_connection(app1, node1) + +# Deploy +ec.deploy() + +ec.wait_finished([app1]) + +ec.shutdown() + +# End diff --git a/src/nepi/resources/linux/ccn/ccnd.py b/src/nepi/resources/linux/ccn/ccnd.py index ffaee508..44bc2761 100644 --- a/src/nepi/resources/linux/ccn/ccnd.py +++ b/src/nepi/resources/linux/ccn/ccnd.py @@ -136,6 +136,10 @@ class LinuxCCND(LinuxApplication): def path(self): return "PATH=$PATH:${BIN}/%s/" % self.version + @property + def environment(self): + return self._environment() + def do_deploy(self): if not self.node or self.node.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) diff --git a/src/nepi/resources/linux/ccn/ccnr.py b/src/nepi/resources/linux/ccn/ccnr.py index 9ac5cce0..e839a56a 100644 --- a/src/nepi/resources/linux/ccn/ccnr.py +++ b/src/nepi/resources/linux/ccn/ccnr.py @@ -296,6 +296,7 @@ class LinuxCCNR(LinuxApplication): }) env = self.ccnd.path + #env = self.ccnd.path + self.ccnd.environment env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), self.get(k)) \ if self.get(k) else "", envs.keys())) diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 8ad69f32..13cf0767 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -681,7 +681,7 @@ class LinuxNode(ResourceManager): # If dst files should not be overwritten, check that the files do not # exits already if isinstance(src, str): - src = map(str.strip, src.split(";")) + src = map(os.path.expanduser, map(str.strip, src.split(";"))) if overwrite == False: src = self.filter_existing_files(src, dst) diff --git a/src/nepi/resources/planetlab/sfa_node.py b/src/nepi/resources/planetlab/sfa_node.py new file mode 100644 index 00000000..bb823d56 --- /dev/null +++ b/src/nepi/resources/planetlab/sfa_node.py @@ -0,0 +1,635 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Lucia Guevgeozian + +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay +from nepi.resources.linux.node import LinuxNode +from nepi.util.sfaapi import SFAAPIFactory +from nepi.util.execfuncs import lexec +from nepi.util import sshfuncs + +from random import randint +import time +import socket +import threading +import datetime + +@clsinit_copy +class PlanetlabSfaNode(LinuxNode): + _rtype = "PlanetlabSfaNode" + _help = "Controls a PlanetLab host accessible using a SSH key " \ + "and provisioned using SFA" + _backend = "planetlab" + + lock = threading.Lock() + + @classmethod + def _register_attributes(cls): + + sfa_user = Attribute("sfauser", "SFA user", + flags = Flags.Credential) + + sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \ + used to generate the user credential") + + city = Attribute("city", "Constrain location (city) during resource \ + discovery. May use wildcards.", + flags = Flags.Filter) + + country = Attribute("country", "Constrain location (country) during \ + resource discovery. May use wildcards.", + flags = Flags.Filter) + + region = Attribute("region", "Constrain location (region) during \ + resource discovery. May use wildcards.", + flags = Flags.Filter) + + architecture = Attribute("architecture", "Constrain architecture \ + during resource discovery.", + type = Types.Enumerate, + allowed = ["x86_64", + "i386"], + flags = Flags.Filter) + + operating_system = Attribute("operatingSystem", "Constrain operating \ + system during resource discovery.", + type = Types.Enumerate, + allowed = ["f8", + "f12", + "f14", + "centos", + "other"], + flags = Flags.Filter) + + min_reliability = Attribute("minReliability", "Constrain reliability \ + while picking PlanetLab nodes. Specifies a lower \ + acceptable bound.", + type = Types.Double, + range = (1, 100), + flags = Flags.Filter) + + max_reliability = Attribute("maxReliability", "Constrain reliability \ + while picking PlanetLab nodes. Specifies an upper \ + acceptable bound.", + type = Types.Double, + range = (1, 100), + flags = Flags.Filter) + + min_bandwidth = Attribute("minBandwidth", "Constrain available \ + bandwidth while picking PlanetLab nodes. \ + Specifies a lower acceptable bound.", + type = Types.Double, + range = (0, 2**31), + flags = Flags.Filter) + + max_bandwidth = Attribute("maxBandwidth", "Constrain available \ + bandwidth while picking PlanetLab nodes. \ + Specifies an upper acceptable bound.", + type = Types.Double, + range = (0, 2**31), + flags = Flags.Filter) + + min_load = Attribute("minLoad", "Constrain node load average while \ + picking PlanetLab nodes. Specifies a lower acceptable \ + bound.", + type = Types.Double, + range = (0, 2**31), + flags = Flags.Filter) + + max_load = Attribute("maxLoad", "Constrain node load average while \ + picking PlanetLab nodes. Specifies an upper acceptable \ + bound.", + type = Types.Double, + range = (0, 2**31), + flags = Flags.Filter) + + min_cpu = Attribute("minCpu", "Constrain available cpu time while \ + picking PlanetLab nodes. Specifies a lower acceptable \ + bound.", + type = Types.Double, + range = (0, 100), + flags = Flags.Filter) + + max_cpu = Attribute("maxCpu", "Constrain available cpu time while \ + picking PlanetLab nodes. Specifies an upper acceptable \ + bound.", + type = Types.Double, + range = (0, 100), + flags = Flags.Filter) + + timeframe = Attribute("timeframe", "Past time period in which to check\ + information about the node. Values are year,month, \ + week, latest", + default = "week", + type = Types.Enumerate, + allowed = ["latest", + "week", + "month", + "year"], + flags = Flags.Filter) + +# plblacklist = Attribute("blacklist", "Take into account the file plblacklist \ +# in the user's home directory under .nepi directory. This file \ +# contains a list of PL nodes to blacklist, and at the end \ +# of the experiment execution the new blacklisted nodes are added.", +# type = Types.Bool, +# default = True, +# flags = Flags.ReadOnly) +# + + cls._register_attribute(sfa_user) + cls._register_attribute(sfa_private_key) + cls._register_attribute(city) + cls._register_attribute(country) + cls._register_attribute(region) + cls._register_attribute(architecture) + cls._register_attribute(operating_system) + cls._register_attribute(min_reliability) + cls._register_attribute(max_reliability) + cls._register_attribute(min_bandwidth) + cls._register_attribute(max_bandwidth) + cls._register_attribute(min_load) + cls._register_attribute(max_load) + cls._register_attribute(min_cpu) + cls._register_attribute(max_cpu) + cls._register_attribute(timeframe) + + def __init__(self, ec, guid): + super(PlanetlabSfaNode, self).__init__(ec, guid) + + self._sfaapi = None + self._node_to_provision = None + self._slicenode = False + self._hostname = False + + if self.get("gateway") or self.get("gatewayUser"): + self.set("gateway", None) + self.set("gatewayUser", None) + + def _skip_provision(self): + sfa_user = self.get("sfauser") + if not sfa_user: + return True + else: return False + + @property + def sfaapi(self): + if not self._sfaapi: + sfa_user = self.get("sfauser") + sfa_sm = "http://sfa3.planet-lab.eu:12346/" + sfa_auth = '.'.join(sfa_user.split('.')[:2]) + sfa_registry = "http://sfa3.planet-lab.eu:12345/" + sfa_private_key = self.get("sfaPrivateKey") + + self._sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, + sfa_registry, sfa_sm, sfa_private_key) + + if not self._sfaapi: + self.fail_sfaapi() + + return self._sfaapi + + def do_discover(self): + """ + Based on the attributes defined by the user, discover the suitable + nodes for provision. + """ + if self._skip_provision(): + super(PlanetlabSfaNode, self).do_discover() + return + + nodes = self.sfaapi.get_resources_hrn() + + hostname = self._get_hostname() + if hostname: + # the user specified one particular node to be provisioned + self._hostname = True + host_hrn = nodes[hostname] + print host_hrn + + # check that the node is not blacklisted or being provisioned + # by other RM + with PlanetlabSfaNode.lock: + plist = self.sfaapi.reserved() + blist = self.sfaapi.blacklisted() + if host_hrn not in blist and host_hrn not in plist: + + # check that is really alive, by performing ping + ping_ok = self._do_ping(hostname) + if not ping_ok: + self._blacklist_node(host_hrn) + self.fail_node_not_alive(hostname) + else: + if self._check_if_in_slice([host_hrn]): + self._slicenode = True + self._put_node_in_provision(host_hrn) + self._node_to_provision = host_hrn + else: + self.fail_node_not_available(hostname) + super(PlanetlabSfaNode, self).do_discover() + + else: + # the user specifies constraints based on attributes, zero, one or + # more nodes can match these constraints + nodes = self._filter_based_on_attributes() + + # nodes that are already part of user's slice have the priority to + # provisioned + nodes_inslice = self._check_if_in_slice(nodes) + nodes_not_inslice = list(set(nodes) - set(nodes_inslice)) + + node_id = None + if nodes_inslice: + node_id = self._choose_random_node(nodes_inslice) + self._slicenode = True + + if not node_id: + # Either there were no matching nodes in the user's slice, or + # the nodes in the slice were blacklisted or being provisioned + # by other RM. Note nodes_not_inslice is never empty + node_id = self._choose_random_node(nodes_not_inslice) + self._slicenode = False + + if node_id: + self._node_to_provision = node_id + try: + self._set_hostname_attr(node_id) + self.info(" Selected node to provision ") + super(PlanetlabSfaNode, self).do_discover() + except: + with PlanetlabSfaNode.lock: + self._blacklist_node(node_id) + self.do_discover() + else: + self.fail_not_enough_nodes() + + def do_provision(self): + """ + Add node to user's slice after verifing that the node is functioning + correctly. + """ + if self._skip_provision(): + super(PlanetlabSfaNode, self).do_provision() + return + + provision_ok = False + ssh_ok = False + proc_ok = False + timeout = 1800 + + while not provision_ok: + node = self._node_to_provision + if not self._slicenode: + self._add_node_to_slice(node) + + # check ssh connection + t = 0 + while t < timeout and not ssh_ok: + + cmd = 'echo \'GOOD NODE\'' + ((out, err), proc) = self.execute(cmd) + if out.find("GOOD NODE") < 0: + t = t + 60 + time.sleep(60) + continue + else: + ssh_ok = True + continue + else: + cmd = 'echo \'GOOD NODE\'' + ((out, err), proc) = self.execute(cmd) + if not out.find("GOOD NODE") < 0: + ssh_ok = True + + if not ssh_ok: + # the timeout was reach without establishing ssh connection + # the node is blacklisted, deleted from the slice, and a new + # node to provision is discovered + with PlanetlabSfaNode.lock: + self.warn(" Could not SSH login ") + self._blacklist_node(node) + #self._delete_node_from_slice(node) + self.do_discover() + continue + + # check /proc directory is mounted (ssh_ok = True) + # and file system is not read only + else: + cmd = 'mount |grep proc' + ((out1, err1), proc1) = self.execute(cmd) + cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile' + ((out2, err2), proc2) = self.execute(cmd) + if out1.find("/proc type proc") < 0 or \ + "Read-only file system".lower() in err2.lower(): + with PlanetlabNode.lock: + self.warn(" Corrupted file system ") + self._blacklist_node(node) + #self._delete_node_from_slice(node) + self.do_discover() + continue + + else: + provision_ok = True + if not self.get('hostname'): + self._set_hostname_attr(node) + self.info(" Node provisioned ") + + super(PlanetlabSfaNode, self).do_provision() + + def _filter_based_on_attributes(self): + """ + Retrive the list of nodes hrn that match user's constraints + """ + # Map user's defined attributes with tagnames of PlanetLab + timeframe = self.get("timeframe")[0] + attr_to_tags = { + 'city' : 'city', + 'country' : 'country', + 'region' : 'region', + 'architecture' : 'arch', + 'operatingSystem' : 'fcdistro', + 'minReliability' : 'reliability%s' % timeframe, + 'maxReliability' : 'reliability%s' % timeframe, + 'minBandwidth' : 'bw%s' % timeframe, + 'maxBandwidth' : 'bw%s' % timeframe, + 'minLoad' : 'load%s' % timeframe, + 'maxLoad' : 'load%s' % timeframe, + 'minCpu' : 'cpu%s' % timeframe, + 'maxCpu' : 'cpu%s' % timeframe, + } + + nodes_hrn = [] + filters = {} + + for attr_name, attr_obj in self._attrs.iteritems(): + attr_value = self.get(attr_name) + + if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \ + attr_name != 'timeframe': + + attr_tag = attr_to_tags[attr_name] + filters['tagname'] = attr_tag + + # filter nodes by fixed constraints e.g. operating system + if not 'min' in attr_name and not 'max' in attr_name: + filters['value'] = attr_value + nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn) + + # filter nodes by range constraints e.g. max bandwidth + elif ('min' or 'max') in attr_name: + nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn) + + if not filters: + nodes = self.sfaapi.get_resources_hrn() + for node in nodes: + nodes_hrn.append(node[node.key()]) + return nodes_hrn + + def _filter_by_fixed_attr(self, filters, nodes_hrn): + """ + Query SFA API for nodes matching fixed attributes defined by the + user + """ + pass +# node_tags = self.sfaapi.get_resources_tags(filters) +# if node_tags is not None: +# +# if len(nodes_id) == 0: +# # first attribute being matched +# for node_tag in node_tags: +# nodes_id.append(node_tag['node_id']) +# else: +# # remove the nodes ids that don't match the new attribute +# # that is being match +# +# nodes_id_tmp = [] +# for node_tag in node_tags: +# if node_tag['node_id'] in nodes_id: +# nodes_id_tmp.append(node_tag['node_id']) +# +# if len(nodes_id_tmp): +# nodes_id = set(nodes_id) & set(nodes_id_tmp) +# else: +# # no node from before match the new constraint +# self.fail_discovery() +# else: +# # no nodes match the filter applied +# self.fail_discovery() +# +# return nodes_id + + def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id): + """ + Query PLCAPI for nodes ids matching attributes defined in a certain + range, by the user + """ + pass +# node_tags = self.plapi.get_node_tags(filters) +# if node_tags: +# +# if len(nodes_id) == 0: +# # first attribute being matched +# for node_tag in node_tags: +# +# # check that matches the min or max restriction +# if 'min' in attr_name and node_tag['value'] != 'n/a' and \ +# float(node_tag['value']) > attr_value: +# nodes_id.append(node_tag['node_id']) +# +# elif 'max' in attr_name and node_tag['value'] != 'n/a' and \ +# float(node_tag['value']) < attr_value: +# nodes_id.append(node_tag['node_id']) +# else: +# +# # remove the nodes ids that don't match the new attribute +# # that is being match +# nodes_id_tmp = [] +# for node_tag in node_tags: +# +# # check that matches the min or max restriction and was a +# # matching previous filters +# if 'min' in attr_name and node_tag['value'] != 'n/a' and \ +# float(node_tag['value']) > attr_value and \ +# node_tag['node_id'] in nodes_id: +# nodes_id_tmp.append(node_tag['node_id']) +# +# elif 'max' in attr_name and node_tag['value'] != 'n/a' and \ +# float(node_tag['value']) < attr_value and \ +# node_tag['node_id'] in nodes_id: +# nodes_id_tmp.append(node_tag['node_id']) +# +# if len(nodes_id_tmp): +# nodes_id = set(nodes_id) & set(nodes_id_tmp) +# else: +# # no node from before match the new constraint +# self.fail_discovery() +# +# else: #TODO CHECK +# # no nodes match the filter applied +# self.fail_discovery() +# +# return nodes_id + + def _choose_random_node(self, nodes): + """ + From the possible nodes for provision, choose randomly to decrese the + probability of different RMs choosing the same node for provision + """ + size = len(nodes) + while size: + size = size - 1 + index = randint(0, size) + node_id = nodes[index] + nodes[index] = nodes[size] + + # check the node is not blacklisted or being provision by other RM + # and perform ping to check that is really alive + with PlanetlabNode.lock: + + blist = self.plapi.blacklisted() + plist = self.plapi.reserved() + if node_id not in blist and node_id not in plist: + ping_ok = self._do_ping(node_id) + if not ping_ok: + self._set_hostname_attr(node_id) + self.warn(" Node not responding PING ") + self._blacklist_node(node_id) + else: + # discovered node for provision, added to provision list + self._put_node_in_provision(node_id) + return node_id + + def _get_nodes_id(self, filters=None): + return self.plapi.get_nodes(filters, fields=['node_id']) + + def _add_node_to_slice(self, host_hrn): + self.info(" Adding node to slice ") + slicename = self.get("username").replace('_', '.') + slicename = 'ple.' + slicename + self.sfaapi.add_resource_to_slice(slicename, host_hrn) + + def _delete_node_from_slice(self, node): + self.warn(" Deleting node from slice ") + slicename = self.get("username") + self.plapi.delete_slice_node(slicename, [node]) + + def _get_hostname(self): + hostname = self.get("hostname") + if hostname: + return hostname + else: + return None + + def _set_hostname_attr(self, node): + """ + Query PLCAPI for the hostname of a certain node id and sets the + attribute hostname, it will over write the previous value + """ + hostname = self.plapi.get_nodes(node, ['hostname']) + self.set("hostname", hostname[0]['hostname']) + + def _check_if_in_slice(self, hosts_hrn): + """ + Check using SFA API if any host hrn from hosts_hrn is in the user's + slice + """ + slicename = self.get("username").replace('_', '.') + slicename = 'ple.' + slicename + slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource'] + slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes) + nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn)) + return nodes_inslice + + def _do_ping(self, hostname): + """ + Perform ping command on node's IP matching hostname + """ + ping_ok = False + ip = self._get_ip(hostname) + if not ip: return ping_ok + + command = "ping -c4 %s" % ip + + (out, err) = lexec(command) + if not str(out).find("2 received") < 0 or not str(out).find("3 received") < 0 or not \ + str(out).find("4 received") < 0: + ping_ok = True + + return ping_ok + + def _blacklist_node(self, host_hrn): + """ + Add node mal functioning node to blacklist + """ + self.warn(" Blacklisting malfunctioning node ") + self.sfaapi.blacklist_resource(host_hrn) + if not self._hostname: + self.set('hostname', None) + + def _put_node_in_provision(self, host_hrn): + """ + Add node to the list of nodes being provisioned, in order for other RMs + to not try to provision the same one again + """ + self.sfaapi.reserve_resource(host_hrn) + + def _get_ip(self, hostname): + """ + Query PLCAPI for the IP of a node with certain node id + """ + try: + ip = sshfuncs.gethostbyname(hostname) + except: + # Fail while trying to find the IP + return None + return ip + + def _get_nodes_hrn(self): + nodes = self.sfaapi.get_resouces_hrn() + + + + def fail_discovery(self): + msg = "Discovery failed. No candidates found for node" + self.error(msg) + raise RuntimeError, msg + + def fail_node_not_alive(self, hostname=None): + msg = "Node %s not alive" % hostname + raise RuntimeError, msg + + def fail_node_not_available(self, hostname): + msg = "Node %s not available for provisioning" % hostname + raise RuntimeError, msg + + def fail_not_enough_nodes(self): + msg = "Not enough nodes available for provisioning" + raise RuntimeError, msg + + def fail_plapi(self): + msg = "Failing while trying to instanciate the PLC API.\nSet the" + \ + " attributes pluser and plpassword." + raise RuntimeError, msg + + def valid_connection(self, guid): + # TODO: Validate! + return True + + diff --git a/src/nepi/util/sfa_api.py b/src/nepi/util/sfa_api.py deleted file mode 100644 index e77c7259..00000000 --- a/src/nepi/util/sfa_api.py +++ /dev/null @@ -1,286 +0,0 @@ -# -# NEPI, a framework to manage network experiments -# Copyright (C) 2013 INRIA -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -# Author: Alina Quereilhac -# Lucia Guevgeozian Odizzio - - -import logging -import hashlib - -from sfa_sfav1 import SFAResourcesParser -import subprocess - -# TODO: Use nepi utils Logger instead of warnings! -import warnings - -import threading - -class SFAApi(object): - - def __init__(self, aggregate = 'ple', slice_id = None, sfi_auth = None, sfi_user = None, - sfi_registry = None, sfi_sm = None, timeout = None, private_key = None): - - self._resources = dict() - self._reservable_resources = list() - self._leases = dict() - self._slice_tags = dict() - self._slice_resources = set() - self._slice_leases = set() - self._aggregate = aggregate - self._slice_hrn = slice_id - # TODO: take into account Rspec version, SFA V1, GENI V2, GENI V3 - # For now is SFA V1 from PlanetLab and Nitos (wrong namespace) - self._parser = sfa_sfav1.SFAResourcesParser(['ple', 'omf']) - self._lock = threading.Lock() - - # Paremeters to contact the XMLRPC SFA service - self._sfi_parameters = {'-a': sfi_auth, '-u': sfi_user, - '-r': sfi_registry, '-s': sfi_sm, '-t': timeout, - '-k': private_key} - - #self._logger = logging.getLogger('nepi.utils.sfiapi') - self._fetch_resources_info() - self._fetch_slice_info() - - def _sfi_command_options(self): - command_options = " ".join("%s %s" % (k,v) for (k,v) in \ - self._sfi_parameters.iteritems() if v is not None) - return command_options - - def _sfi_command_exec(self, command): - args = command.split(" ") - s = subprocess.Popen(args, stdout = subprocess.PIPE, - stdin = subprocess.PIPE) - xml, err = s.communicate() - if err: - raise RuntimeError("Command excecution problem, error: %s", err) - return xml - - def _fetch_resources_info(self, resources = True): - command_options = self._sfi_command_options() - command = "sfi.py " + command_options + " resources -l all" - try: - xml = self._sfi_command_exec(command) - except: - #self._logger.error("Error in SFA responds: %s", xml) - raise - if resources: - self._resources, self._leases = self._parser.resources_from_xml(xml, resources = True) - else: - self._leases = self._parser.resources_from_xml(xml) - #self._update_reservable() - return xml - - def _fetch_slice_info(self): - command_options = self._sfi_command_options() - command = "sfi.py " + command_options + " resources -l all" - command = command + " " + self._slice_hrn - try: - xml = self._sfi_command_exec(command) - except: - #self._logger.error("Error in SFA responds: %s", xml) - raise - self._slice_resources, self._slice_leases, self._slice_tags = \ - self._parser.resources_from_xml(xml, sliver = True, resources = True) - return xml - - def _update_reservable(self): - for rid, r in self._resources.iteritems(): - if (r['resource_type'] == 'node' and r['exclusive'].upper() == 'TRUE') \ - or (r['resource_type'] == 'channel'): - self._reservable_resources.append(rid) - - - def discover_resources(self, resourceId=None, fields=[], **kwargs): - result = dict() - resources = self._resources - - if resourceId is not None: - resource_ids = resourceId - if not isinstance(resource_ids, list): - resource_ids = [resource_ids] - resources = self._filter_by_resourceId(resources, resource_ids) - else: - for filter, value in kwargs.items(): - resources = self._filter_by_filter(resources, filter, value) - if not fields: - return resources - else: - for k, info in resources.iteritems(): - info = self._extract_fields(info, fields) - result[k] = info - return result - - def _filter_by_resourceId(self, resources, resource_ids): - return dict((k, resources[k]) for k in resource_ids if k in resources) - - def _filter_by_filter(self, resources, filter, value): - d = dict() - for k in resources.keys(): - if filter in resources[k]: - if resources[k][filter] == value: - d[k] = resources[k] - return d - - def _extract_fields(self, info, fields): - return dict((k, info[k]) for k in fields if k in info) - - def discover_fields(self): - resources = self._resources - fields = [] - for k, data in resources.iteritems(): - for field in data: - if field not in fields: - fields.append(field) - return fields - - def discover_leases(self, resourceId=None): - leases = self._leases - - if resourceId is not None: - resource_ids = resourceId - if not isinstance(resourceId, list): - resource_ids = [resource_ids] - leases = self._filterbyresourceId(leases, resource_ids) - return leases - - def find_resources(self, leases, resources, rtype, quantity, start_time, duration, slot): - result = dict() - if rtype not in ['node', 'channel']: - raise RuntimeError("Unknown type") - - finish_time = start_time + duration * slot - - leases_resources = dict() - reservable_resources = dict() - for lid, lease in leases.iteritems(): - if lease[0]['type'] == rtype: - leases_resources.update({lid: lease}) - #print leases_resources - for rid, resource in resources.iteritems(): - if rtype == 'node' and (resource['type'] == 'node' and resource['exclusive'].upper() == 'TRUE'): - reservable_resources.update({rid: resource}) - elif rtype == 'channel': - reservable_resources.update({rid: resource}) - #if resource['type'] == 'rtype' and resources['exclusive'].upper() == 'TRUE':\ - # (in case adding exclusive tag to channels) - - free_resources = list(set(reservable_resources.keys()) - set(leases_resources.keys())) - - if len(free_resources) >= quantity: - free_resources = free_resources[:quantity] - for rid, resource in resources.iteritems(): - if rid in free_resources: - result[rid] = resource - return result - else: - maybe_free = [] - new_quan = quantity - len(free_resources) - print new_quan - - for lid, lease in leases_resources.iteritems(): - for l in lease: - st = int(l['start_time']) - ft = st + int(l['duration']) * slot - if (st <= finish_time <= ft) or (st <= start_time <= ft): - if lid in maybe_free: - maybe_free.remove(lid) - break - else: - if lid not in maybe_free: - maybe_free.append(lid) - if len(maybe_free) >= new_quan: - free_resources = [free_resources, maybe_free] - free_resources = sum(free_resources, []) - for rid, resource in resources.iteritems(): - if rid in free_resources: - result[rid] = resource - return result - #return free_resources - warnings.warn("There aren't enough nodes") - - - def provision_resource(self, new_resource, start_time = None, duration = None): - import os, tempfile - with self._lock: - xml = self._fetch_slice_info() - new_xml = self._parser.create_reservation_xml(xml, self._slice_hrn,\ - new_resource, start_time, duration, self._aggregate) - fh, fname = tempfile.mkstemp() - print fname - os.write(fh, new_xml) - os.close(fh) - try: - command_options = self._sfi_command_options() - command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname) - out = self._sfi_command_exec(command) - except: - raise - xml = self._fetch_slice_info() - return self._parser.verify_reservation_xml(xml, self._slice_hrn, new_resource, start_time,\ - duration, self._aggregate) - - def release_resource(self, resource, start_time = None, duration = None): - import os, tempfile - with self._lock: - xml = self._fetch_slice_info() - new_xml = self._parser.release_reservation_xml(xml, self._slice_hrn, resource,\ - start_time, duration, self._aggregate) - fh, fname = tempfile.mkstemp() - print fname - os.write(fh, new_xml) - os.close(fh) - try: - command_options = self._sfi_command_options() - command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname) - out = self._sfi_command_exec(command) - except: - raise - xml = self._fetch_slice_info() - return not self._parser.verify_reservation_xml(xml, self._slice_hrn, resource, start_time,\ - duration, self._aggregate) - - -class SFAApiFactory(object): - lock = threading.Lock() - _apis = dict() - - @classmethod - def get_api(slice_id = None, sfi_auth = None, sfi_user = None, - sfi_registry = None, sfi_sm = None, timeout = None, private_key = None): - - key = cls.make_key(slice_id, sfi_auth, sfi_user, sfi_registry, - sfi_sm, timeout, private_key, aggregate = 'ple') - api = cls._apis.get(key) - cls.lock.acquire() - api._fetch_resources_info(resources = False) - api._fetch_slice_info() - cls.lock.release() - - if not api: - api = SFAApi(slice_id = None, sfi_auth = None, sfi_user = None, - sfi_registry = None, sfi_sm = None, timeout = None, private_key = None) - cls._apis[key] = api - - return api - - @classmethod - def make_key(cls, *args): - skey = "".join(map(str, args)) - return hashlib.md5(skey).hexdigest() - diff --git a/src/nepi/util/sfaapi.py b/src/nepi/util/sfaapi.py new file mode 100644 index 00000000..12e6af07 --- /dev/null +++ b/src/nepi/util/sfaapi.py @@ -0,0 +1,258 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Lucia Guevgeozian Odizzio + +import threading +import hashlib +import re +import os + +from nepi.util.logger import Logger + +try: + from sfa.client.sfi import Sfi + from sfa.util.xrn import hrn_to_urn +except ImportError: + log = Logger("SFA API") + log.debug("Packages sfa-common or sfa-client not installed.\ + Could not import sfa.client.sfi or sfa.util.xrn") + +from nepi.util.sfarspec_proc import SfaRSpecProcessing + +class SFAAPI(object): + """ + API for quering the SFA service. + """ + def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, + timeout): + + self.sfi_user = sfi_user + self.sfi_auth = sfi_auth + self.sfi_registry = sfi_registry + self.sfi_sm = sfi_sm + self.private_key = private_key + self.timeout = timeout + + self._blacklist = set() + self._reserved = set() + self._resources_cache = None + self._already_cached = False + self._log = Logger("SFA API") + self.api = Sfi() + self.rspec_proc = SfaRSpecProcessing() + self.lock = threading.Lock() + + def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None): + """ + Execute sfi method. + """ + if command in ['describe', 'delete', 'allocate', 'provision']: + if not slicename: + raise TypeError("The slice hrn is expected for this method %s" % command) + if command == 'allocate' and not rspec: + raise TypeError("RSpec is expected for this method %s" % command) + + if command == 'allocate': + args_list = [slicename, rspec] + elif command == 'delete': + args_list = [slicename, urn] + else: args_list = [slicename, '-o', '/tmp/rspec_output'] + + elif command == 'resources': + args_list = ['-o', '/tmp/rspec_output'] + + else: raise TypeError("Sfi method not supported") + + self.api.options.timeout = self.timeout + self.api.options.raw = None + self.api.options.user = self.sfi_user + self.api.options.auth = self.sfi_auth + self.api.options.registry = self.sfi_registry + self.api.options.sm = self.sfi_sm + self.api.options.user_private_key = self.private_key + + self.api.command = command + self.api.command_parser = self.api.create_parser_command(self.api.command) + (command_options, command_args) = self.api.command_parser.parse_args(args_list) + self.api.command_options = command_options + self.api.read_config() + self.api.bootstrap() + + self.api.dispatch(command, command_options, command_args) + with open("/tmp/rspec_output.rspec", "r") as result_file: + result = result_file.read() + return result + + def get_resources_info(self): + """ + Get all resources and its attributes from aggregate. + """ + try: + rspec_slice = self._sfi_exec_method('resources') + except: + raise RuntimeError("Fail to list resources") + + self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice) + self._already_cached = True + return self._resources_cache + + def get_resources_hrn(self, resources=None): + """ + Get list of resources hrn, without the resource info. + """ + if not resources: + if not self._already_cached: + resources = self.get_resources_info()['resource'] + else: + resources = self._resources_cache['resource'] + + component_tohrn = dict() + for resource in resources: + hrn = resource['hrn'].replace('\\', '') + component_tohrn[resource['component_name']] = hrn + + return component_tohrn + + def get_slice_resources(self, slicename): + """ + Get resources and info from slice. + """ + try: + with self.lock: + rspec_slice = self._sfi_exec_method('describe', slicename) + except: + raise RuntimeError("Fail to allocate resource for slice %s" % slicename) + + result = self.rspec_proc.parse_sfa_rspec(rspec_slice) + return result + + + def add_resource_to_slice(self, slicename, resource_hrn, leases=None): + """ + Get the list of resources' urn, build the rspec string and call the allocate + and provision method. + """ + resources_hrn_new = list() + resource_parts = resource_hrn.split('.') + resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:]) + resources_hrn_new.append(resource_hrn) + + slice_resources = self.get_slice_resources(slicename)['resource'] + + with self.lock: + if slice_resources: + slice_resources_hrn = self.get_resources_hrn(slice_resources) + for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems(): + s_parts = s_hrn_value.split('.') + s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:]) + resources_hrn_new.append(s_hrn) + + resources_urn = self._get_resources_urn(resources_hrn_new) + rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases) + f = open("/tmp/rspec_input.rspec", "w") + f.truncate(0) + f.write(rspec) + f.close() + + if not os.path.getsize("/tmp/rspec_input.rspec") > 0: + raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename) + + try: + self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec") + except: + raise RuntimeError("Fail to allocate resource for slice %s" % slicename) + try: + self._sfi_exec_method('provision', slicename) + except: + raise RuntimeError("Fail to provision resource for slice %s" % slicename) + return True + + def remove_resource_from_slice(self, slicename, resource_hrn, leases=None): + """ + Get the list of resources' urn, build the rspec string and call the allocate + and provision method. + """ + resource_urn = self._get_resources_urn([resource_hrn]).pop() + try: + self._sfi_exec_method('delete', slicename, urn=resource_urn) + except: + raise RuntimeError("Fail to delete resource for slice %s" % slicename) + return True + + + def _get_resources_urn(self, resources_hrn): + """ + Builds list of resources' urn based on hrn. + """ + resources_urn = list() + + for resource in resources_hrn: + resources_urn.append(hrn_to_urn(resource, 'node')) + + return resources_urn + + def blacklist_resource(self, resource_hrn): + self._blacklist.add(resource_hrn) + + def blacklisted(self): + return self._blacklist + + def unblacklist_resource(self, resource_hrn): + del self._blacklist[resource_hrn] + + def reserve_resource(self, resource_hrn): + self._reserved.add(resource_hrn) + + def reserved(self): + return self._reserved + + +class SFAAPIFactory(object): + """ + API Factory to manage a map of SFAAPI instances as key-value pairs, it + instanciate a single instance per key. The key represents the same SFA, + credentials. + """ + + _lock = threading.Lock() + _apis = dict() + + + @classmethod + def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, + timeout = None): + + if sfi_user and sfi_sm: + key = cls.make_key(sfi_user, sfi_sm) + with cls._lock: + api = cls._apis.get(key) + + if not api: + api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, + timeout) + cls._apis[key] = api + + return api + + return None + + @classmethod + def make_key(cls, *args): + skey = "".join(map(str, args)) + return hashlib.md5(skey).hexdigest() + diff --git a/src/nepi/util/sfarspec_proc.py b/src/nepi/util/sfarspec_proc.py new file mode 100644 index 00000000..40c8480d --- /dev/null +++ b/src/nepi/util/sfarspec_proc.py @@ -0,0 +1,197 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +from nepi.util.logger import Logger +try: + from sfa.rspecs.rspec import RSpec + from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn, urn_to_hrn +except ImportError: + log = Logger("SFA RSpec Processing") + log.debug("Package sfa-common not installed.\ + Could not import sfa.rspecs.rspec and sfa.util.xrn") + +from types import StringTypes, ListType + + +class SfaRSpecProcessing(object): + """ + Class to process SFA RSpecs, parse the RSpec replies such as Advertisement RSpecs, + and build in the case of Request RSpecs. + """ + def __init__(self, config=None): + self._log = Logger("SFA RSpec Processing") + self.config = config + + def make_dict_rec(self, obj): + if not obj or isinstance(obj, (StringTypes, bool)): + return obj + if isinstance(obj, list): + objcopy = [] + for x in obj: + objcopy.append(self.make_dict_rec(x)) + return objcopy + # We thus suppose we have a child of dict + objcopy = {} + for k, v in obj.items(): + objcopy[k] = self.make_dict_rec(v) + return objcopy + + def parse_sfa_rspec(self, rspec_string): + """ + Parse the RSpec XML as a string. + """ + # rspec_type and rspec_version should be set in the config of the platform, + # we use GENIv3 as default one if not + if self.config: + if 'rspec_type' and 'rspec_version' in self.config: + rspec_version = self.config['rspec_type'] + ' ' + self.config['rspec_version'] + else: + rspec_version = 'GENI 3' + self._log.debug(rspec_version) + rspec = RSpec(rspec_string, version=rspec_version) + + try: + nodes = rspec.version.get_nodes() + except Exception, e: + self._log.warn("Could not retrieve nodes in RSpec: %s" % e) + try: + leases = rspec.version.get_leases() + except Exception, e: + self._log.warn("Could not retrieve leases in RSpec: %s" % e) + try: + links = rspec.version.get_links() + except Exception, e: + self._log.warn("Could not retrieve links in RSpec: %s" % e) + try: + channels = rspec.version.get_channels() + except Exception, e: + self._log.warn("Could not retrieve channels in RSpec: %s" % e) + + resources = [] + # Extend object and Format object field's name + for node in nodes: + node['type'] = 'node' + node['network_hrn'] = Xrn(node['component_id']).authority[0] # network ? XXX + node['hrn'] = urn_to_hrn(node['component_id'])[0] + node['urn'] = node['component_id'] + node['hostname'] = node['component_name'] + node['initscripts'] = node.pop('pl_initscripts') + if 'exclusive' in node and node['exclusive']: + node['exclusive'] = node['exclusive'].lower() == 'true' + + # XXX This should use a MAP as before + if 'position' in node: # iotlab + node['x'] = node['position']['posx'] + node['y'] = node['position']['posy'] + node['z'] = node['position']['posz'] + del node['position'] + + if 'location' in node: + if node['location']: + node['latitude'] = node['location']['latitude'] + node['longitude'] = node['location']['longitude'] + del node['location'] + + # Flatten tags + if 'tags' in node: + if node['tags']: + for tag in node['tags']: + node[tag['tagname']] = tag['value'] + del node['tags'] + + + # We suppose we have children of dict that cannot be serialized + # with xmlrpc, let's make dict + resources.append(self.make_dict_rec(node)) + + # NOTE a channel is a resource and should not be treated independently + # resource + # | + # +----+------+-------+ + # | | | | + # node link channel etc. + #resources.extend(nodes) + #resources.extend(channels) + + return {'resource': resources, 'lease': leases } +# 'channel': channels \ +# } + + + def build_sfa_rspec(self, slice_id, resources, leases): + """ + Build the XML RSpec from list of resources' urns. + eg. resources = ["urn:publicid:IDN+ple:modenaple+node+planetlab-1.ing.unimo.it"] + """ + #if isinstance(resources, str): + # resources = eval(resources) + # rspec_type and rspec_version should be set in the config of the platform, + # we use GENIv3 as default one if not + if self.config: + if 'rspec_type' and 'rspec_version' in self.config: + rspec_version = self.config['rspec_type'] + ' ' + self.config['rspec_version'] + else: + rspec_version = 'GENI 3' + + # extend rspec version with "content_type" + rspec_version += ' request' + + rspec = RSpec(version=rspec_version) + + nodes = [] + channels = [] + links = [] + self._log.info("Building RSpec for resources %s" % resources) + for urn in resources: + # XXX TO BE CORRECTED, this handles None values + if not urn: + continue + self._log.info(urn) + resource = dict() + # TODO: take into account the case where we send a dict of URNs without keys + #resource['component_id'] = resource.pop('urn') + resource['component_id'] = urn + resource_hrn, resource_type = urn_to_hrn(resource['component_id']) + # build component_manager_id + top_auth = resource_hrn.split('.')[0] + cm = urn.split("+") + resource['component_manager_id'] = "%s+%s+authority+cm" % (cm[0],top_auth) + + if resource_type == 'node': + # XXX dirty hack WiLab !!! + if self.config: + if 'wilab2' in self.config['sm']: + resource['client_id'] = "PC" + resource['sliver_type'] = "raw-pc" + nodes.append(resource) + elif resource_type == 'link': + links.append(resource) + elif resource_type == 'channel': + channels.append(resource) + else: + raise Exception, "Not supported type of resource" + + rspec.version.add_nodes(nodes, rspec_content_type="request") + #rspec.version.add_leases(leases) + #rspec.version.add_links(links) + #rspec.version.add_channels(channels) + + self._log.info("request rspec: %s"%rspec.toxml()) + return rspec.toxml() + + diff --git a/test/lib/test_utils.py b/test/lib/test_utils.py index 57050f38..19405526 100644 --- a/test/lib/test_utils.py +++ b/test/lib/test_utils.py @@ -108,3 +108,17 @@ def skipIfNotPythonVersion(func): return wrapped +def skipIfNotSfaCredentials(func): + name = func.__name__ + def wrapped(*args, **kwargs): + sfa_user = os.environ.get("SFA_USER") + sfa_pk = os.environ.get("SFA_PK") + + if not (sfa_user and os.path.exists(os.path.expanduser(sfa_pk))): + print "*** WARNING: Skipping test %s: SFA path to private key doesn't exist\n" % name + return + + return func(*args, **kwargs) + + return wrapped + diff --git a/test/resources/linux/node.py b/test/resources/linux/node.py index e46f6673..27b7817e 100755 --- a/test/resources/linux/node.py +++ b/test/resources/linux/node.py @@ -298,7 +298,7 @@ main (void) dirpath = tempfile.mkdtemp() f = tempfile.NamedTemporaryFile(dir=dirpath, delete=False) f.close() - + f1 = tempfile.NamedTemporaryFile(delete=False) f1.close() f1.name @@ -310,7 +310,8 @@ main (void) node.copy(source, dest) command = "ls %s" % destdir - + + import pdb;pdb.set_trace() (out, err), proc = node.execute(command) os.remove(f1.name) diff --git a/test/resources/planetlab/sfa_node.py b/test/resources/planetlab/sfa_node.py new file mode 100755 index 00000000..cd6e6bb9 --- /dev/null +++ b/test/resources/planetlab/sfa_node.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Lucia Guevgeozian + +from nepi.execution.ec import ExperimentController + +from nepi.resources.planetlab.sfa_node import PlanetlabSfaNode +from nepi.util.sfaapi import SFAAPI, SFAAPIFactory + +from test_utils import skipIfNotSfaCredentials + +import os +import time +import unittest +import multiprocessing + + +class DummyEC(ExperimentController): + pass + +class PLSfaNodeFactoryTestCase(unittest.TestCase): + + def test_creation_phase(self): + self.assertEquals(PlanetlabSfaNode._rtype, "PlanetlabSfaNode") + self.assertEquals(len(PlanetlabSfaNode._attributes), 29) + +class PLSfaNodeTestCase(unittest.TestCase): + """ + This tests use inria_nepi slice, from the test instance of MyPLC + nepiplc.pl.sophia.inria.fr. This test can fail if the user running + the test does not have a user in this instance of MyPLC or is not + added to the inria_nepi slice. + """ + + def setUp(self): + self.ec = DummyEC() + self.username = os.environ.get('SFA_SLICE') + self.sfauser = os.environ.get('SFA_USER') + self.sfaPrivateKey = os.environ.get('SFA_PK') + + @skipIfNotSfaCredentials + def test_a_sfaapi(self): + """ + Check that the api to discover and reserve resources is well + instanciated, and is an instance of SFAAPI. Check that using + the same credentials, the same object of the api is used. + """ + node1 = self.ec.register_resource("PlanetlabSfaNode") + self.ec.set(node1, "hostname", "planetlab2.ionio.gr") + self.ec.set(node1, "username", self.username) + self.ec.set(node1, "sfauser", self.sfauser) + self.ec.set(node1, "sfaPrivateKey", self.sfaPrivateKey) + + plnode_rm1 = self.ec.get_resource(node1) + + self.assertIsNone(plnode_rm1._node_to_provision) + + api1 = plnode_rm1.sfaapi + self.assertIsInstance(api1, SFAAPI) + self.assertEquals(len(api1.reserved()), 0) + self.assertEquals(len(api1.blacklisted()), 0) + + node2 = self.ec.register_resource("PlanetlabSfaNode") + self.ec.set(node2, "hostname", "planetlab2.ionio.gr") + self.ec.set(node2, "username", self.username) + self.ec.set(node2, "sfauser", self.sfauser) + self.ec.set(node2, "sfaPrivateKey", self.sfaPrivateKey) + + plnode_rm2 = self.ec.get_resource(node2) + api2 = plnode_rm2.sfaapi + self.assertEquals(api1, api2) + + @skipIfNotSfaCredentials + def test_discover(self): + """ + Check that the method do_discover reserve the right node. + """ + node = self.ec.register_resource("PlanetlabSfaNode") + self.ec.set(node, "hostname", "planetlab2.ionio.gr") + self.ec.set(node, "username", self.username) + self.ec.set(node, "sfauser", self.sfauser) + self.ec.set(node, "sfaPrivateKey", self.sfaPrivateKey) + + plnode_rm = self.ec.get_resource(node) + + hostname = plnode_rm.get("hostname") + self.assertIsNotNone(hostname) + + self.assertEquals(plnode_rm.sfaapi.reserved(), set()) + + plnode_rm.do_discover() + self.assertEquals(plnode_rm.sfaapi.reserved().pop(), 'ple.dbislab.planetlab2.ionio.gr') + self.assertEquals(plnode_rm._node_to_provision, 'ple.dbislab.planetlab2.ionio.gr') + + @skipIfNotSfaCredentials + def test_provision(self): + """ + This test checks that the method do_provision add the node in the slice and check + its well functioning. + """ + node = self.ec.register_resource("PlanetlabSfaNode") + self.ec.set(node, "hostname", "planetlab2.ionio.gr") + self.ec.set(node, "username", self.username) + self.ec.set(node, "sfauser", self.sfauser) + self.ec.set(node, "sfaPrivateKey", self.sfaPrivateKey) + + plnode_rm = self.ec.get_resource(node) + + self.assertEquals(plnode_rm.sfaapi.reserved(), set()) + self.assertIsNone(plnode_rm._node_to_provision) + + slicename = 'ple.' + self.username.replace('_', '.') + + plnode_rm.do_discover() + plnode_rm.do_provision() + + cmd = 'echo "IT WORKED"' + ((out, err), proc) = plnode_rm.execute(cmd) + self.assertEquals(out.strip(), "IT WORKED") + + urn_to_delete = 'urn:publicid:IDN+ple:dbislab+node+planetlab2.ionio.gr' + plnode_rm.sfaapi.remove_resource_from_slice(slicename, urn_to_delete) + + slice_resources = plnode_rm.sfaapi.get_slice_resources(slicename)['resource'] + if slice_resources: + slice_resources_hrn = plnode_rm.sfaapi.get_resources_hrn(slice_resources) + self.assertNotIn('planetlab2.ionio.gr', slice_resources_hrn.keys()) + + @skipIfNotSfaCredentials + def test_xdeploy(self): + """ + Test with the nodes being discover and provision at the same time. + The deploy should fail as the test before, there aren't 4 nodes of + that carachteristics. + """ + node = self.ec.register_resource("PlanetlabSfaNode") + self.ec.set(node, "hostname", "planetlab2.ionio.gr") + self.ec.set(node, "username", self.username) + self.ec.set(node, "sfauser", self.sfauser) + self.ec.set(node, "sfaPrivateKey", self.sfaPrivateKey) + + self.ec.deploy() + self.ec.wait_deployed(node) + state = self.ec.state(node) + self.assertEquals(state, 3) + + def tearDown(self): + self.ec.shutdown() + + +if __name__ == '__main__': + unittest.main() + + + -- 2.43.0