Adding sfa support ple using hostname
authorLucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
Fri, 21 Mar 2014 10:20:55 +0000 (11:20 +0100)
committerLucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
Fri, 21 Mar 2014 10:20:55 +0000 (11:20 +0100)
13 files changed:
Makefile
examples/linux/ccn/two_nodes_file_retrieval.py [changed mode: 0644->0755]
examples/planetlab/sfa.py [new file with mode: 0755]
src/nepi/resources/linux/ccn/ccnd.py
src/nepi/resources/linux/ccn/ccnr.py
src/nepi/resources/linux/node.py
src/nepi/resources/planetlab/sfa_node.py [new file with mode: 0644]
src/nepi/util/sfa_api.py [deleted file]
src/nepi/util/sfaapi.py [new file with mode: 0644]
src/nepi/util/sfarspec_proc.py [new file with mode: 0644]
test/lib/test_utils.py
test/resources/linux/node.py
test/resources/planetlab/sfa_node.py [new file with mode: 0755]

index 24e5b15..039820b 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -11,11 +11,7 @@ SUBBUILDDIR = $(shell python -c 'import distutils.util, sys; \
 PYTHON25 := $(shell python -c 'import sys; v = sys.version_info; \
     print (1 if v[0] <= 2 and v[1] <= 5 else 0)')
 
-ifeq ($(PYTHON25),0)
-BUILDDIR := $(BUILDDIR)/$(SUBBUILDDIR)
-else
 BUILDDIR := $(BUILDDIR)/lib
-endif
 
 PYPATH = $(BUILDDIR):$(TESTLIB):$(PYTHONPATH)
 COVERAGE = $(or $(shell which coverage), $(shell which python-coverage), \
old mode 100644 (file)
new mode 100755 (executable)
index 8e40c3b..1d406d2
@@ -1,3 +1,4 @@
+#!/usr/bin/env python
 #
 #    NEPI, a framework to manage network experiments
 #    Copyright (C) 2014 INRIA
@@ -31,8 +32,9 @@ from nepi.execution.ec import ExperimentController
 
 import os
 
-ssh_key = ####### <<< ASSING the absolute path to the private SSH key to login into the remote host >>>
-ssh_user = ####### <<< ASSING the SSH username >>>
+#ssh_key = ####### <<< ASSING the absolute path to the private SSH key to login into the remote host >>>
+#ssh_user = ####### <<< ASSING the SSH username >>>
+ssh_user = "icnuser"
 
 ## Create the experiment controller
 ec = ExperimentController(exp_id = "demo_CCN")
@@ -40,28 +42,32 @@ ec = ExperimentController(exp_id = "demo_CCN")
 ## Register node 1
 node1 = ec.register_resource("LinuxNode")
 # Set the hostname of the first node to use for the experiment
-hostname1 = "peeramidion.irisa.fr" ##### <<< ASSIGN the hostname of a host you have SSSH access to >>>
+#hostname1 = "peeramidion.irisa.fr" ##### <<< ASSIGN the hostname of a host you have SSSH access to >>>
+hostname1 = "133.69.33.148"
 ec.set(node1, "hostname", hostname1)
 # username should be your SSH user 
 ec.set(node1, "username", ssh_user)
 # Absolute path to the SSH private key
-ec.set(node1, "identity", ssh_key)
+#ec.set(node1, "identity", ssh_key)
 # Clean all files, results, etc, from previous experiments wit the same exp_id
-ec.set(node1, "cleanExperiment", True)
+#ec.set(node1, "cleanExperiment", True)
+ec.set(node1, "cleanHome", True)
 # Kill all running processes in the node before running the experiment
 ec.set(node1, "cleanProcesses", True)
 
 ## Register node 2 
 node2 = ec.register_resource("LinuxNode")
 # Set the hostname of the first node to use for the experiment
-hostname2 = "planetlab2.upc.es" ##### <<< ASSIGN the hostname of a host you have SSSH access to >>>
+#hostname2 = "planetlab2.upc.es" ##### <<< ASSIGN the hostname of a host you have SSSH access to >>>
+hostname2 = "133.69.33.149"
 ec.set(node2, "hostname", hostname2)
 # username should be your SSH user 
 ec.set(node2, "username", ssh_user)
 # Absolute path to the SSH private key
-ec.set(node2, "identity", ssh_key)
+#ec.set(node2, "identity", ssh_key)
 # Clean all files, results, etc, from previous experiments wit the same exp_id
-ec.set(node2, "cleanExperiment", True)
+#ec.set(node2, "cleanExperiment", True)
+ec.set(node2, "cleanHome", True)
 # Kill all running processes in the node before running the experiment
 ec.set(node2, "cleanProcesses", True)
 
@@ -69,12 +75,14 @@ ec.set(node2, "cleanProcesses", True)
 ccnd1 = ec.register_resource("LinuxCCND")
 # Set ccnd log level to 7
 ec.set(ccnd1, "debug", 7)
+ec.set(ccnd1, "port", 9597)
 ec.register_connection(ccnd1, node1)
 
 ## Register a CCN daemon in node 2
 ccnd2 = ec.register_resource("LinuxCCND")
 # Set ccnd log level to 7
 ec.set(ccnd2, "debug", 7)
+ec.set(ccnd2, "port", 9597)
 ec.register_connection(ccnd2, node2)
 
 ## Register a repository in node 1
@@ -97,12 +105,14 @@ ec.register_connection(co, ccnr1)
 
 # Register a FIB entry from node 1 to node 2
 entry1 = ec.register_resource("LinuxFIBEntry")
-ec.set(entry1, "host", hostname2)
+ec.set(entry1, "host", "10.0.32.2")
+ec.set(entry1, "port", 9597)
 ec.register_connection(entry1, ccnd1)
 
 # Register a FIB entry from node 2 to node 1
 entry2 = ec.register_resource("LinuxFIBEntry")
-ec.set(entry2, "host", hostname1)
+ec.set(entry2, "host", "10.0.0.2")
+ec.set(entry2, "port", 9597)
 ec.register_connection(entry2, ccnd2)
 
 ## Retrieve the file stored in node 1 from node 2
diff --git a/examples/planetlab/sfa.py b/examples/planetlab/sfa.py
new file mode 100755 (executable)
index 0000000..c5b393e
--- /dev/null
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
+
+from nepi.execution.ec import ExperimentController
+import os
+
+# Create the EC
+exp_id = "sfa_test"
+ec = ExperimentController(exp_id)
+
+username = os.environ.get('SFA_SLICE')
+sfauser = os.environ.get('SFA_USER')
+sfaPrivateKey = os.environ.get('SFA_PK')
+
+# server
+node1 = ec.register_resource("PlanetlabSfaNode")
+ec.set(node1, "hostname", 'planetlab-4.imperial.ac.uk')
+ec.set(node1, "username", username)
+ec.set(node1, "sfauser", sfauser)
+ec.set(node1, "sfaPrivateKey", sfaPrivateKey)
+ec.set(node1, "cleanHome", True)
+ec.set(node1, "cleanProcesses", True)
+
+app1 = ec.register_resource("LinuxApplication")
+command = "ping -c5 google.com" 
+ec.set(app1, "command", command)
+ec.register_connection(app1, node1)
+
+# Deploy
+ec.deploy()
+
+ec.wait_finished([app1])
+
+ec.shutdown()
+
+# End
index ffaee50..44bc276 100644 (file)
@@ -136,6 +136,10 @@ class LinuxCCND(LinuxApplication):
     def path(self):
         return "PATH=$PATH:${BIN}/%s/" % self.version 
 
+    @property
+    def environment(self):
+        return self._environment()
+
     def do_deploy(self):
         if not self.node or self.node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
index 9ac5cce..e839a56 100644 (file)
@@ -296,6 +296,7 @@ class LinuxCCNR(LinuxApplication):
             })
 
         env = self.ccnd.path
+        #env = self.ccnd.path + self.ccnd.environment
         env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), self.get(k)) \
             if self.get(k) else "", envs.keys()))
        
index 8ad69f3..13cf076 100644 (file)
@@ -681,7 +681,7 @@ class LinuxNode(ResourceManager):
         # If dst files should not be overwritten, check that the files do not
         # exits already
         if isinstance(src, str):
-            src = map(str.strip, src.split(";"))
+            src = map(os.path.expanduser, map(str.strip, src.split(";")))
     
         if overwrite == False:
             src = self.filter_existing_files(src, dst)
diff --git a/src/nepi/resources/planetlab/sfa_node.py b/src/nepi/resources/planetlab/sfa_node.py
new file mode 100644 (file)
index 0000000..bb823d5
--- /dev/null
@@ -0,0 +1,635 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay 
+from nepi.resources.linux.node import LinuxNode
+from nepi.util.sfaapi import SFAAPIFactory 
+from nepi.util.execfuncs import lexec
+from nepi.util import sshfuncs
+
+from random import randint
+import time
+import socket
+import threading
+import datetime
+
+@clsinit_copy
+class PlanetlabSfaNode(LinuxNode):
+    _rtype = "PlanetlabSfaNode"
+    _help = "Controls a PlanetLab host accessible using a SSH key " \
+            "and provisioned using SFA"
+    _backend = "planetlab"
+
+    lock = threading.Lock()
+
+    @classmethod
+    def _register_attributes(cls):
+
+        sfa_user = Attribute("sfauser", "SFA user",
+                    flags = Flags.Credential)
+
+        sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
+                            used to generate the user credential")
+
+        city = Attribute("city", "Constrain location (city) during resource \
+                discovery. May use wildcards.",
+                flags = Flags.Filter)
+
+        country = Attribute("country", "Constrain location (country) during \
+                    resource discovery. May use wildcards.",
+                    flags = Flags.Filter)
+
+        region = Attribute("region", "Constrain location (region) during \
+                    resource discovery. May use wildcards.",
+                    flags = Flags.Filter)
+
+        architecture = Attribute("architecture", "Constrain architecture \
+                        during resource discovery.",
+                        type = Types.Enumerate,
+                        allowed = ["x86_64", 
+                                    "i386"],
+                        flags = Flags.Filter)
+
+        operating_system = Attribute("operatingSystem", "Constrain operating \
+                            system during resource discovery.",
+                            type = Types.Enumerate,
+                            allowed =  ["f8",
+                                        "f12",
+                                        "f14",
+                                        "centos",
+                                        "other"],
+                            flags = Flags.Filter)
+
+        min_reliability = Attribute("minReliability", "Constrain reliability \
+                            while picking PlanetLab nodes. Specifies a lower \
+                            acceptable bound.",
+                            type = Types.Double,
+                            range = (1, 100),
+                            flags = Flags.Filter)
+
+        max_reliability = Attribute("maxReliability", "Constrain reliability \
+                            while picking PlanetLab nodes. Specifies an upper \
+                            acceptable bound.",
+                            type = Types.Double,
+                            range = (1, 100),
+                            flags = Flags.Filter)
+
+        min_bandwidth = Attribute("minBandwidth", "Constrain available \
+                            bandwidth while picking PlanetLab nodes. \
+                            Specifies a lower acceptable bound.",
+                            type = Types.Double,
+                            range = (0, 2**31),
+                            flags = Flags.Filter)
+
+        max_bandwidth = Attribute("maxBandwidth", "Constrain available \
+                            bandwidth while picking PlanetLab nodes. \
+                            Specifies an upper acceptable bound.",
+                            type = Types.Double,
+                            range = (0, 2**31),
+                            flags = Flags.Filter)
+
+        min_load = Attribute("minLoad", "Constrain node load average while \
+                    picking PlanetLab nodes. Specifies a lower acceptable \
+                    bound.",
+                    type = Types.Double,
+                    range = (0, 2**31),
+                    flags = Flags.Filter)
+
+        max_load = Attribute("maxLoad", "Constrain node load average while \
+                    picking PlanetLab nodes. Specifies an upper acceptable \
+                    bound.",
+                    type = Types.Double,
+                    range = (0, 2**31),
+                    flags = Flags.Filter)
+
+        min_cpu = Attribute("minCpu", "Constrain available cpu time while \
+                    picking PlanetLab nodes. Specifies a lower acceptable \
+                    bound.",
+                    type = Types.Double,
+                    range = (0, 100),
+                    flags = Flags.Filter)
+
+        max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
+                    picking PlanetLab nodes. Specifies an upper acceptable \
+                    bound.",
+                    type = Types.Double,
+                    range = (0, 100),
+                    flags = Flags.Filter)
+
+        timeframe = Attribute("timeframe", "Past time period in which to check\
+                        information about the node. Values are year,month, \
+                        week, latest",
+                        default = "week",
+                        type = Types.Enumerate,
+                        allowed = ["latest",
+                                    "week",
+                                    "month",
+                                    "year"],
+                        flags = Flags.Filter)
+
+#        plblacklist = Attribute("blacklist", "Take into account the file plblacklist \
+#                        in the user's home directory under .nepi directory. This file \
+#                        contains a list of PL nodes to blacklist, and at the end \
+#                        of the experiment execution the new blacklisted nodes are added.",
+#                    type = Types.Bool,
+#                    default = True,
+#                    flags = Flags.ReadOnly)
+#
+
+        cls._register_attribute(sfa_user)
+        cls._register_attribute(sfa_private_key)
+        cls._register_attribute(city)
+        cls._register_attribute(country)
+        cls._register_attribute(region)
+        cls._register_attribute(architecture)
+        cls._register_attribute(operating_system)
+        cls._register_attribute(min_reliability)
+        cls._register_attribute(max_reliability)
+        cls._register_attribute(min_bandwidth)
+        cls._register_attribute(max_bandwidth)
+        cls._register_attribute(min_load)
+        cls._register_attribute(max_load)
+        cls._register_attribute(min_cpu)
+        cls._register_attribute(max_cpu)
+        cls._register_attribute(timeframe)
+
+    def __init__(self, ec, guid):
+        super(PlanetlabSfaNode, self).__init__(ec, guid)
+
+        self._sfaapi = None
+        self._node_to_provision = None
+        self._slicenode = False
+        self._hostname = False
+
+        if self.get("gateway") or self.get("gatewayUser"):
+            self.set("gateway", None)
+            self.set("gatewayUser", None)
+
+    def _skip_provision(self):
+        sfa_user = self.get("sfauser")
+        if not sfa_user:
+            return True
+        else: return False
+    
+    @property
+    def sfaapi(self):
+        if not self._sfaapi:
+            sfa_user = self.get("sfauser")
+            sfa_sm = "http://sfa3.planet-lab.eu:12346/"
+            sfa_auth = '.'.join(sfa_user.split('.')[:2])
+            sfa_registry = "http://sfa3.planet-lab.eu:12345/"
+            sfa_private_key = self.get("sfaPrivateKey")
+
+            self._sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
+                sfa_registry, sfa_sm, sfa_private_key)
+            
+            if not self._sfaapi:
+                self.fail_sfaapi()
+
+        return self._sfaapi
+
+    def do_discover(self):
+        """
+        Based on the attributes defined by the user, discover the suitable 
+        nodes for provision.
+        """
+        if self._skip_provision():
+            super(PlanetlabSfaNode, self).do_discover()
+            return
+
+        nodes = self.sfaapi.get_resources_hrn()
+
+        hostname = self._get_hostname()
+        if hostname:
+            # the user specified one particular node to be provisioned
+            self._hostname = True
+            host_hrn = nodes[hostname]
+            print host_hrn
+
+            # check that the node is not blacklisted or being provisioned
+            # by other RM
+            with PlanetlabSfaNode.lock:
+                plist = self.sfaapi.reserved()
+                blist = self.sfaapi.blacklisted()
+                if host_hrn not in blist and host_hrn not in plist:
+                
+                    # check that is really alive, by performing ping
+                    ping_ok = self._do_ping(hostname)
+                    if not ping_ok:
+                        self._blacklist_node(host_hrn)
+                        self.fail_node_not_alive(hostname)
+                    else:
+                        if self._check_if_in_slice([host_hrn]):
+                            self._slicenode = True
+                        self._put_node_in_provision(host_hrn)
+                        self._node_to_provision = host_hrn
+                else:
+                    self.fail_node_not_available(hostname)
+            super(PlanetlabSfaNode, self).do_discover()
+        
+        else:
+            # the user specifies constraints based on attributes, zero, one or 
+            # more nodes can match these constraints 
+            nodes = self._filter_based_on_attributes()
+
+            # nodes that are already part of user's slice have the priority to
+            # provisioned
+            nodes_inslice = self._check_if_in_slice(nodes)
+            nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
+            
+            node_id = None
+            if nodes_inslice:
+                node_id = self._choose_random_node(nodes_inslice)
+                self._slicenode = True                
+                
+            if not node_id:
+                # Either there were no matching nodes in the user's slice, or
+                # the nodes in the slice  were blacklisted or being provisioned
+                # by other RM. Note nodes_not_inslice is never empty
+                node_id = self._choose_random_node(nodes_not_inslice)
+                self._slicenode = False
+
+            if node_id:
+                self._node_to_provision = node_id
+                try:
+                    self._set_hostname_attr(node_id)
+                    self.info(" Selected node to provision ")
+                    super(PlanetlabSfaNode, self).do_discover()
+                except:
+                    with PlanetlabSfaNode.lock:
+                        self._blacklist_node(node_id)
+                    self.do_discover()
+            else:
+               self.fail_not_enough_nodes() 
+            
+    def do_provision(self):
+        """
+        Add node to user's slice after verifing that the node is functioning
+        correctly.
+        """
+        if self._skip_provision():
+            super(PlanetlabSfaNode, self).do_provision()
+            return
+
+        provision_ok = False
+        ssh_ok = False
+        proc_ok = False
+        timeout = 1800
+
+        while not provision_ok:
+            node = self._node_to_provision
+            if not self._slicenode:
+                self._add_node_to_slice(node)
+            
+                # check ssh connection
+                t = 0 
+                while t < timeout and not ssh_ok:
+
+                    cmd = 'echo \'GOOD NODE\''
+                    ((out, err), proc) = self.execute(cmd)
+                    if out.find("GOOD NODE") < 0:
+                        t = t + 60
+                        time.sleep(60)
+                        continue
+                    else:
+                        ssh_ok = True
+                        continue
+            else:
+                cmd = 'echo \'GOOD NODE\''
+                ((out, err), proc) = self.execute(cmd)
+                if not out.find("GOOD NODE") < 0:
+                    ssh_ok = True
+
+            if not ssh_ok:
+                # the timeout was reach without establishing ssh connection
+                # the node is blacklisted, deleted from the slice, and a new
+                # node to provision is discovered
+                with PlanetlabSfaNode.lock:
+                    self.warn(" Could not SSH login ")
+                    self._blacklist_node(node)
+                    #self._delete_node_from_slice(node)
+                self.do_discover()
+                continue
+            
+            # check /proc directory is mounted (ssh_ok = True)
+            # and file system is not read only
+            else:
+                cmd = 'mount |grep proc'
+                ((out1, err1), proc1) = self.execute(cmd)
+                cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
+                ((out2, err2), proc2) = self.execute(cmd)
+                if out1.find("/proc type proc") < 0 or \
+                    "Read-only file system".lower() in err2.lower():
+                    with PlanetlabNode.lock:
+                        self.warn(" Corrupted file system ")
+                        self._blacklist_node(node)
+                        #self._delete_node_from_slice(node)
+                    self.do_discover()
+                    continue
+            
+                else:
+                    provision_ok = True
+                    if not self.get('hostname'):
+                        self._set_hostname_attr(node)            
+                    self.info(" Node provisioned ")            
+            
+        super(PlanetlabSfaNode, self).do_provision()
+
+    def _filter_based_on_attributes(self):
+        """
+        Retrive the list of nodes hrn that match user's constraints 
+        """
+        # Map user's defined attributes with tagnames of PlanetLab
+        timeframe = self.get("timeframe")[0]
+        attr_to_tags = {
+            'city' : 'city',
+            'country' : 'country',
+            'region' : 'region',
+            'architecture' : 'arch',
+            'operatingSystem' : 'fcdistro',
+            'minReliability' : 'reliability%s' % timeframe,
+            'maxReliability' : 'reliability%s' % timeframe,
+            'minBandwidth' : 'bw%s' % timeframe,
+            'maxBandwidth' : 'bw%s' % timeframe,
+            'minLoad' : 'load%s' % timeframe,
+            'maxLoad' : 'load%s' % timeframe,
+            'minCpu' : 'cpu%s' % timeframe,
+            'maxCpu' : 'cpu%s' % timeframe,
+        }
+        
+        nodes_hrn = []
+        filters = {}
+
+        for attr_name, attr_obj in self._attrs.iteritems():
+            attr_value = self.get(attr_name)
+            
+            if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
+                attr_name != 'timeframe':
+        
+                attr_tag = attr_to_tags[attr_name]
+                filters['tagname'] = attr_tag
+
+                # filter nodes by fixed constraints e.g. operating system
+                if not 'min' in attr_name and not 'max' in attr_name:
+                    filters['value'] = attr_value
+                    nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
+
+                # filter nodes by range constraints e.g. max bandwidth
+                elif ('min' or 'max') in attr_name:
+                    nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
+
+        if not filters:
+            nodes = self.sfaapi.get_resources_hrn()
+            for node in nodes:
+                nodes_hrn.append(node[node.key()])
+        return nodes_hrn
+                    
+    def _filter_by_fixed_attr(self, filters, nodes_hrn):
+        """
+        Query SFA API for nodes matching fixed attributes defined by the
+        user
+        """
+        pass
+#        node_tags = self.sfaapi.get_resources_tags(filters)
+#        if node_tags is not None:
+#
+#            if len(nodes_id) == 0:
+#                # first attribute being matched
+#                for node_tag in node_tags:
+#                    nodes_id.append(node_tag['node_id'])
+#            else:
+#                # remove the nodes ids that don't match the new attribute
+#                # that is being match
+#
+#                nodes_id_tmp = []
+#                for node_tag in node_tags:
+#                    if node_tag['node_id'] in nodes_id:
+#                        nodes_id_tmp.append(node_tag['node_id'])
+#
+#                if len(nodes_id_tmp):
+#                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
+#                else:
+#                    # no node from before match the new constraint
+#                    self.fail_discovery()
+#        else:
+#            # no nodes match the filter applied
+#            self.fail_discovery()
+#
+#        return nodes_id
+
+    def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
+        """
+        Query PLCAPI for nodes ids matching attributes defined in a certain
+        range, by the user
+        """
+        pass
+#        node_tags = self.plapi.get_node_tags(filters)
+#        if node_tags:
+#            
+#            if len(nodes_id) == 0:
+#                # first attribute being matched
+#                for node_tag in node_tags:
+# 
+#                   # check that matches the min or max restriction
+#                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
+#                        float(node_tag['value']) > attr_value:
+#                        nodes_id.append(node_tag['node_id'])
+#
+#                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
+#                        float(node_tag['value']) < attr_value:
+#                        nodes_id.append(node_tag['node_id'])
+#            else:
+#
+#                # remove the nodes ids that don't match the new attribute
+#                # that is being match
+#                nodes_id_tmp = []
+#                for node_tag in node_tags:
+#
+#                    # check that matches the min or max restriction and was a
+#                    # matching previous filters
+#                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
+#                        float(node_tag['value']) > attr_value and \
+#                        node_tag['node_id'] in nodes_id:
+#                        nodes_id_tmp.append(node_tag['node_id'])
+#
+#                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
+#                        float(node_tag['value']) < attr_value and \
+#                        node_tag['node_id'] in nodes_id:
+#                        nodes_id_tmp.append(node_tag['node_id'])
+#
+#                if len(nodes_id_tmp):
+#                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
+#                else:
+#                    # no node from before match the new constraint
+#                    self.fail_discovery()
+#
+#        else: #TODO CHECK
+#            # no nodes match the filter applied
+#            self.fail_discovery()
+#
+#        return nodes_id
+        
+    def _choose_random_node(self, nodes):
+        """
+        From the possible nodes for provision, choose randomly to decrese the
+        probability of different RMs choosing the same node for provision
+        """
+        size = len(nodes)
+        while size:
+            size = size - 1
+            index = randint(0, size)
+            node_id = nodes[index]
+            nodes[index] = nodes[size]
+
+            # check the node is not blacklisted or being provision by other RM
+            # and perform ping to check that is really alive
+            with PlanetlabNode.lock:
+
+                blist = self.plapi.blacklisted()
+                plist = self.plapi.reserved()
+                if node_id not in blist and node_id not in plist:
+                    ping_ok = self._do_ping(node_id)
+                    if not ping_ok:
+                        self._set_hostname_attr(node_id)
+                        self.warn(" Node not responding PING ")
+                        self._blacklist_node(node_id)
+                    else:
+                        # discovered node for provision, added to provision list
+                        self._put_node_in_provision(node_id)
+                        return node_id
+
+    def _get_nodes_id(self, filters=None):
+        return self.plapi.get_nodes(filters, fields=['node_id'])
+
+    def _add_node_to_slice(self, host_hrn):
+        self.info(" Adding node to slice ")
+        slicename = self.get("username").replace('_', '.')
+        slicename = 'ple.' + slicename
+        self.sfaapi.add_resource_to_slice(slicename, host_hrn)
+
+    def _delete_node_from_slice(self, node):
+        self.warn(" Deleting node from slice ")
+        slicename = self.get("username")
+        self.plapi.delete_slice_node(slicename, [node])
+
+    def _get_hostname(self):
+        hostname = self.get("hostname")
+        if hostname:
+            return hostname
+        else:
+            return None
+
+    def _set_hostname_attr(self, node):
+        """
+        Query PLCAPI for the hostname of a certain node id and sets the
+        attribute hostname, it will over write the previous value
+        """
+        hostname = self.plapi.get_nodes(node, ['hostname'])
+        self.set("hostname", hostname[0]['hostname'])
+
+    def _check_if_in_slice(self, hosts_hrn):
+        """
+        Check using SFA API if any host hrn from hosts_hrn is in the user's
+        slice
+        """
+        slicename = self.get("username").replace('_', '.')
+        slicename = 'ple.' + slicename
+        slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
+        slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes)
+        nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
+        return nodes_inslice
+
+    def _do_ping(self, hostname):
+        """
+        Perform ping command on node's IP matching hostname
+        """
+        ping_ok = False
+        ip = self._get_ip(hostname)
+        if not ip: return ping_ok
+
+        command = "ping -c4 %s" % ip
+
+        (out, err) = lexec(command)
+        if not str(out).find("2 received") < 0 or not str(out).find("3 received") < 0 or not \
+            str(out).find("4 received") < 0:
+            ping_ok = True
+       
+        return ping_ok 
+
+    def _blacklist_node(self, host_hrn):
+        """
+        Add node mal functioning node to blacklist
+        """
+        self.warn(" Blacklisting malfunctioning node ")
+        self.sfaapi.blacklist_resource(host_hrn)
+        if not self._hostname:
+            self.set('hostname', None)
+
+    def _put_node_in_provision(self, host_hrn):
+        """
+        Add node to the list of nodes being provisioned, in order for other RMs
+        to not try to provision the same one again
+        """
+        self.sfaapi.reserve_resource(host_hrn)
+
+    def _get_ip(self, hostname):
+        """
+        Query PLCAPI for the IP of a node with certain node id
+        """
+        try:
+            ip = sshfuncs.gethostbyname(hostname)
+        except:
+            # Fail while trying to find the IP
+            return None
+        return ip
+
+    def _get_nodes_hrn(self):
+        nodes = self.sfaapi.get_resouces_hrn()
+
+        
+
+    def fail_discovery(self):
+        msg = "Discovery failed. No candidates found for node"
+        self.error(msg)
+        raise RuntimeError, msg
+
+    def fail_node_not_alive(self, hostname=None):
+        msg = "Node %s not alive" % hostname
+        raise RuntimeError, msg
+    
+    def fail_node_not_available(self, hostname):
+        msg = "Node %s not available for provisioning" % hostname
+        raise RuntimeError, msg
+
+    def fail_not_enough_nodes(self):
+        msg = "Not enough nodes available for provisioning"
+        raise RuntimeError, msg
+
+    def fail_plapi(self):
+        msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
+            " attributes pluser and plpassword."
+        raise RuntimeError, msg
+
+    def valid_connection(self, guid):
+        # TODO: Validate!
+        return True
+
+
diff --git a/src/nepi/util/sfa_api.py b/src/nepi/util/sfa_api.py
deleted file mode 100644 (file)
index e77c725..0000000
+++ /dev/null
@@ -1,286 +0,0 @@
-#
-#    NEPI, a framework to manage network experiments
-#    Copyright (C) 2013 INRIA
-#
-#    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU General Public License as published by
-#    the Free Software Foundation, either version 3 of the License, or
-#    (at your option) any later version.
-#
-#    This program is distributed in the hope that it will be useful,
-#    but WITHOUT ANY WARRANTY; without even the implied warranty of
-#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-#    GNU General Public License for more details.
-#
-#    You should have received a copy of the GNU General Public License
-#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-#
-# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-#         Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
-
-
-import logging
-import hashlib
-
-from sfa_sfav1 import SFAResourcesParser
-import subprocess
-
-# TODO: Use nepi utils Logger instead of warnings!
-import warnings
-
-import threading
-
-class SFAApi(object):
-
-    def __init__(self, aggregate = 'ple', slice_id = None, sfi_auth = None, sfi_user = None,
-            sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
-    
-        self._resources = dict()
-        self._reservable_resources = list()
-        self._leases = dict()
-        self._slice_tags = dict()
-        self._slice_resources = set()
-        self._slice_leases = set()
-        self._aggregate = aggregate
-        self._slice_hrn = slice_id
-        # TODO: take into account Rspec version, SFA V1, GENI V2, GENI V3
-        # For now is SFA V1 from PlanetLab and Nitos (wrong namespace)
-        self._parser = sfa_sfav1.SFAResourcesParser(['ple', 'omf'])
-        self._lock = threading.Lock()
-
-        # Paremeters to contact the XMLRPC SFA service
-        self._sfi_parameters = {'-a': sfi_auth, '-u': sfi_user,
-                '-r': sfi_registry, '-s': sfi_sm, '-t': timeout,
-                '-k': private_key}
-
-        #self._logger = logging.getLogger('nepi.utils.sfiapi')
-        self._fetch_resources_info()
-        self._fetch_slice_info()
-
-    def _sfi_command_options(self):
-        command_options = " ".join("%s %s" % (k,v) for (k,v) in \
-                self._sfi_parameters.iteritems() if v is not None)
-        return command_options
-
-    def _sfi_command_exec(self, command):
-        args = command.split(" ")
-        s = subprocess.Popen(args, stdout = subprocess.PIPE,
-                stdin = subprocess.PIPE)
-        xml, err = s.communicate()
-        if err:
-           raise RuntimeError("Command excecution problem, error: %s", err)
-        return xml
-
-    def _fetch_resources_info(self, resources = True):
-        command_options = self._sfi_command_options()
-        command = "sfi.py " + command_options + " resources -l all"
-        try:
-            xml = self._sfi_command_exec(command)
-        except:
-            #self._logger.error("Error in SFA responds: %s", xml)
-            raise
-        if resources:
-            self._resources, self._leases = self._parser.resources_from_xml(xml, resources = True)
-        else:
-            self._leases = self._parser.resources_from_xml(xml)
-        #self._update_reservable()
-        return xml
-    
-    def _fetch_slice_info(self):
-        command_options = self._sfi_command_options()
-        command = "sfi.py " + command_options + " resources -l all"
-        command = command + " " + self._slice_hrn
-        try:
-            xml = self._sfi_command_exec(command)
-        except:
-            #self._logger.error("Error in SFA responds: %s", xml)
-            raise
-        self._slice_resources, self._slice_leases, self._slice_tags = \
-            self._parser.resources_from_xml(xml, sliver = True, resources = True)
-        return xml
-
-    def _update_reservable(self):
-        for rid, r in self._resources.iteritems():
-            if (r['resource_type'] == 'node' and r['exclusive'].upper() == 'TRUE') \
-                 or (r['resource_type'] == 'channel'):
-                self._reservable_resources.append(rid)
-
-
-    def discover_resources(self, resourceId=None, fields=[], **kwargs):
-        result = dict()
-        resources = self._resources
-
-        if resourceId is not None:
-            resource_ids = resourceId
-            if not isinstance(resource_ids, list):
-                resource_ids = [resource_ids]
-            resources = self._filter_by_resourceId(resources, resource_ids)
-        else:
-            for filter, value in kwargs.items():
-                resources = self._filter_by_filter(resources, filter, value)
-        if not fields:
-            return resources
-        else:
-            for k, info in resources.iteritems():
-                info = self._extract_fields(info, fields)
-                result[k] = info
-            return result
-                
-    def _filter_by_resourceId(self, resources, resource_ids):
-        return dict((k, resources[k]) for k in resource_ids if k in resources)
-
-    def _filter_by_filter(self, resources, filter, value):
-        d = dict()
-        for k in resources.keys():
-            if filter in resources[k]:
-                if resources[k][filter] == value:
-                    d[k] = resources[k]
-        return d
-               
-    def _extract_fields(self, info, fields):
-        return dict((k, info[k]) for k in fields if k in info)
-
-    def discover_fields(self):
-        resources = self._resources
-        fields = []
-        for k, data in resources.iteritems():
-            for field in data:
-                if field not in fields:
-                    fields.append(field)
-        return fields
-
-    def discover_leases(self, resourceId=None):
-        leases = self._leases
-
-        if resourceId is not None:
-            resource_ids = resourceId
-            if not isinstance(resourceId, list):
-                resource_ids = [resource_ids]
-            leases = self._filterbyresourceId(leases, resource_ids)
-        return leases
-
-    def find_resources(self, leases, resources, rtype, quantity, start_time, duration, slot):
-        result = dict()
-        if rtype not in ['node', 'channel']:
-            raise RuntimeError("Unknown type")
-
-        finish_time = start_time + duration * slot
-
-        leases_resources = dict()
-        reservable_resources = dict()
-        for lid, lease in leases.iteritems():
-            if lease[0]['type'] == rtype:
-                leases_resources.update({lid: lease})
-        #print leases_resources
-        for rid, resource in resources.iteritems():
-            if rtype == 'node' and (resource['type'] == 'node' and resource['exclusive'].upper() == 'TRUE'):
-                reservable_resources.update({rid: resource})
-            elif rtype == 'channel':
-                reservable_resources.update({rid: resource})
-            #if resource['type'] == 'rtype' and resources['exclusive'].upper() == 'TRUE':\
-            # (in case adding exclusive tag to channels)
-
-        free_resources = list(set(reservable_resources.keys()) - set(leases_resources.keys()))
-    
-        if len(free_resources) >= quantity:
-            free_resources = free_resources[:quantity]
-            for rid, resource in resources.iteritems():
-                if rid in free_resources:
-                    result[rid] = resource
-            return result
-        else:
-            maybe_free = []
-            new_quan = quantity - len(free_resources)
-            print new_quan
-
-            for lid, lease in leases_resources.iteritems():
-                for l in lease:
-                    st = int(l['start_time'])
-                    ft = st + int(l['duration']) * slot
-                    if (st <= finish_time <= ft) or (st <= start_time <= ft):
-                        if lid in maybe_free:
-                            maybe_free.remove(lid)
-                        break
-                    else:
-                        if lid not in maybe_free:
-                            maybe_free.append(lid)
-                if len(maybe_free) >= new_quan:
-                    free_resources = [free_resources, maybe_free]
-                    free_resources = sum(free_resources, [])
-                    for rid, resource in resources.iteritems():
-                        if rid in free_resources:
-                            result[rid] = resource
-                        return result
-                    #return free_resources
-            warnings.warn("There aren't enough nodes")
-
-                                 
-    def provision_resource(self, new_resource, start_time = None, duration = None):
-        import os, tempfile
-        with self._lock:
-            xml = self._fetch_slice_info()
-            new_xml = self._parser.create_reservation_xml(xml, self._slice_hrn,\
-            new_resource, start_time, duration, self._aggregate)
-            fh, fname = tempfile.mkstemp()
-            print fname
-            os.write(fh, new_xml)
-            os.close(fh)
-            try:
-                command_options = self._sfi_command_options()
-                command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
-                out = self._sfi_command_exec(command)
-            except:
-                raise
-        xml = self._fetch_slice_info()
-        return self._parser.verify_reservation_xml(xml, self._slice_hrn, new_resource, start_time,\
-                duration, self._aggregate)
-
-    def release_resource(self, resource, start_time = None, duration = None):
-        import os, tempfile
-        with self._lock:
-            xml = self._fetch_slice_info()
-            new_xml = self._parser.release_reservation_xml(xml, self._slice_hrn, resource,\
-            start_time, duration, self._aggregate)
-            fh, fname = tempfile.mkstemp()
-            print fname
-            os.write(fh, new_xml)
-            os.close(fh)
-            try:
-                command_options = self._sfi_command_options()
-                command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
-                out = self._sfi_command_exec(command)
-            except:
-                raise
-        xml = self._fetch_slice_info()
-        return not self._parser.verify_reservation_xml(xml, self._slice_hrn, resource, start_time,\
-            duration, self._aggregate)
-
-
-class SFAApiFactory(object):
-    lock = threading.Lock()
-    _apis = dict()
-
-    @classmethod
-    def get_api(slice_id = None, sfi_auth = None, sfi_user = None,
-            sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
-
-        key = cls.make_key(slice_id, sfi_auth, sfi_user, sfi_registry,
-                           sfi_sm, timeout, private_key, aggregate = 'ple')
-        api = cls._apis.get(key)
-        cls.lock.acquire()
-        api._fetch_resources_info(resources = False)
-        api._fetch_slice_info()
-        cls.lock.release()
-
-        if not api:
-            api = SFAApi(slice_id = None, sfi_auth = None, sfi_user = None,
-            sfi_registry = None, sfi_sm = None, timeout = None, private_key = None)
-            cls._apis[key] = api
-
-        return api
-
-    @classmethod
-    def make_key(cls, *args):
-        skey = "".join(map(str, args))
-        return hashlib.md5(skey).hexdigest()
-
diff --git a/src/nepi/util/sfaapi.py b/src/nepi/util/sfaapi.py
new file mode 100644 (file)
index 0000000..12e6af0
--- /dev/null
@@ -0,0 +1,258 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
+
+import threading
+import hashlib
+import re
+import os
+
+from nepi.util.logger import Logger
+
+try:
+    from sfa.client.sfi import Sfi
+    from sfa.util.xrn import hrn_to_urn
+except ImportError:
+    log = Logger("SFA API")
+    log.debug("Packages sfa-common or sfa-client not installed.\
+         Could not import sfa.client.sfi or sfa.util.xrn")
+
+from nepi.util.sfarspec_proc import SfaRSpecProcessing
+
+class SFAAPI(object):
+    """
+    API for quering the SFA service.
+    """
+    def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
+        timeout):
+
+        self.sfi_user = sfi_user
+        self.sfi_auth = sfi_auth
+        self.sfi_registry = sfi_registry
+        self.sfi_sm = sfi_sm
+        self.private_key = private_key
+        self.timeout = timeout
+
+        self._blacklist = set()
+        self._reserved = set()
+        self._resources_cache = None
+        self._already_cached = False
+        self._log = Logger("SFA API")
+        self.api = Sfi()
+        self.rspec_proc = SfaRSpecProcessing()
+        self.lock = threading.Lock()
+
+    def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None):
+        """
+        Execute sfi method.
+        """
+        if command in ['describe', 'delete', 'allocate', 'provision']:
+            if not slicename:
+                raise TypeError("The slice hrn is expected for this method %s" % command)
+            if command == 'allocate' and not rspec:
+                raise TypeError("RSpec is expected for this method %s" % command)
+            
+            if command == 'allocate':
+                args_list = [slicename, rspec]
+            elif command == 'delete':
+                args_list = [slicename, urn]
+            else: args_list = [slicename, '-o', '/tmp/rspec_output']
+
+        elif command == 'resources':
+            args_list = ['-o', '/tmp/rspec_output']
+
+        else: raise TypeError("Sfi method not supported")
+
+        self.api.options.timeout = self.timeout
+        self.api.options.raw = None
+        self.api.options.user = self.sfi_user
+        self.api.options.auth = self.sfi_auth
+        self.api.options.registry = self.sfi_registry
+        self.api.options.sm = self.sfi_sm
+        self.api.options.user_private_key = self.private_key
+
+        self.api.command = command
+        self.api.command_parser = self.api.create_parser_command(self.api.command)
+        (command_options, command_args) = self.api.command_parser.parse_args(args_list)
+        self.api.command_options = command_options
+        self.api.read_config()
+        self.api.bootstrap()
+
+        self.api.dispatch(command, command_options, command_args)
+        with open("/tmp/rspec_output.rspec", "r") as result_file:
+            result = result_file.read()
+        return result
+
+    def get_resources_info(self):
+        """
+        Get all resources and its attributes from aggregate.
+        """
+        try:
+            rspec_slice = self._sfi_exec_method('resources')
+        except:
+            raise RuntimeError("Fail to list resources")
+   
+        self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
+        self._already_cached = True
+        return self._resources_cache
+
+    def get_resources_hrn(self, resources=None):
+        """
+        Get list of resources hrn, without the resource info.
+        """
+        if not resources:
+            if not self._already_cached:
+                resources = self.get_resources_info()['resource']
+            else:
+                resources = self._resources_cache['resource']
+
+        component_tohrn = dict()
+        for resource in resources:
+            hrn = resource['hrn'].replace('\\', '')
+            component_tohrn[resource['component_name']] = hrn
+
+        return component_tohrn
+            
+    def get_slice_resources(self, slicename):
+        """
+        Get resources and info from slice.
+        """
+        try:
+            with self.lock:
+                rspec_slice = self._sfi_exec_method('describe', slicename)
+        except:
+            raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
+
+        result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
+        return result
+
+
+    def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
+        """
+        Get the list of resources' urn, build the rspec string and call the allocate 
+        and provision method.
+        """
+        resources_hrn_new = list()
+        resource_parts = resource_hrn.split('.')
+        resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
+        resources_hrn_new.append(resource_hrn)
+
+        slice_resources = self.get_slice_resources(slicename)['resource']
+
+        with self.lock:
+            if slice_resources:
+                slice_resources_hrn = self.get_resources_hrn(slice_resources)
+                for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
+                    s_parts = s_hrn_value.split('.')
+                    s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
+                    resources_hrn_new.append(s_hrn)
+
+            resources_urn = self._get_resources_urn(resources_hrn_new)
+            rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
+            f = open("/tmp/rspec_input.rspec", "w")
+            f.truncate(0)
+            f.write(rspec)
+            f.close()
+            
+            if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
+                raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
+
+            try:
+                self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
+            except:
+                raise RuntimeError("Fail to allocate resource for slice %s" % slicename)            
+            try:
+                self._sfi_exec_method('provision', slicename) 
+            except:
+                raise RuntimeError("Fail to provision resource for slice %s" % slicename)
+            return True
+
+    def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
+        """
+        Get the list of resources' urn, build the rspec string and call the allocate 
+        and provision method.
+        """
+        resource_urn = self._get_resources_urn([resource_hrn]).pop()
+        try:
+            self._sfi_exec_method('delete', slicename, urn=resource_urn)
+        except:
+            raise RuntimeError("Fail to delete resource for slice %s" % slicename)
+        return True
+
+
+    def _get_resources_urn(self, resources_hrn):
+        """
+        Builds list of resources' urn based on hrn.
+        """
+        resources_urn = list()
+
+        for resource in resources_hrn:
+            resources_urn.append(hrn_to_urn(resource, 'node'))
+            
+        return resources_urn
+
+    def blacklist_resource(self, resource_hrn):
+        self._blacklist.add(resource_hrn)
+
+    def blacklisted(self):
+        return self._blacklist
+
+    def unblacklist_resource(self, resource_hrn):
+        del self._blacklist[resource_hrn]
+
+    def reserve_resource(self, resource_hrn):
+        self._reserved.add(resource_hrn)
+
+    def reserved(self):
+        return self._reserved
+
+
+class SFAAPIFactory(object):
+    """
+    API Factory to manage a map of SFAAPI instances as key-value pairs, it
+    instanciate a single instance per key. The key represents the same SFA, 
+    credentials.
+    """
+
+    _lock = threading.Lock()
+    _apis = dict()
+
+   
+    @classmethod
+    def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
+            timeout = None):
+
+        if sfi_user and sfi_sm:
+            key = cls.make_key(sfi_user, sfi_sm)
+            with cls._lock:
+                api = cls._apis.get(key)
+
+                if not api:
+                    api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
+                        timeout)
+                    cls._apis[key] = api
+
+                return api
+
+        return None
+
+    @classmethod
+    def make_key(cls, *args):
+        skey = "".join(map(str, args))
+        return hashlib.md5(skey).hexdigest()
+
diff --git a/src/nepi/util/sfarspec_proc.py b/src/nepi/util/sfarspec_proc.py
new file mode 100644 (file)
index 0000000..40c8480
--- /dev/null
@@ -0,0 +1,197 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+from nepi.util.logger import Logger
+try:
+    from sfa.rspecs.rspec import RSpec
+    from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn, urn_to_hrn
+except ImportError:
+    log = Logger("SFA RSpec Processing")
+    log.debug("Package sfa-common not installed.\
+         Could not import sfa.rspecs.rspec and sfa.util.xrn")
+
+from types import StringTypes, ListType
+
+
+class SfaRSpecProcessing(object):
+    """
+    Class to process SFA RSpecs, parse the RSpec replies such as Advertisement RSpecs,
+    and build in the case of Request RSpecs.
+    """
+    def __init__(self, config=None):
+        self._log = Logger("SFA RSpec Processing")
+        self.config = config 
+
+    def make_dict_rec(self, obj):
+        if not obj or isinstance(obj, (StringTypes, bool)):
+            return obj
+        if isinstance(obj, list):
+            objcopy = []
+            for x in obj:
+                objcopy.append(self.make_dict_rec(x))
+            return objcopy
+        # We thus suppose we have a child of dict
+        objcopy = {}
+        for k, v in obj.items():
+            objcopy[k] = self.make_dict_rec(v)
+        return objcopy
+
+    def parse_sfa_rspec(self, rspec_string):
+        """
+        Parse the RSpec XML as a string.
+        """
+        # rspec_type and rspec_version should be set in the config of the platform,
+        # we use GENIv3 as default one if not
+        if self.config:
+            if 'rspec_type' and 'rspec_version' in self.config:
+                rspec_version = self.config['rspec_type'] + ' ' + self.config['rspec_version']
+        else:
+            rspec_version = 'GENI 3'
+        self._log.debug(rspec_version)
+        rspec = RSpec(rspec_string, version=rspec_version)
+        
+        try:
+            nodes = rspec.version.get_nodes()
+        except Exception, e:
+            self._log.warn("Could not retrieve nodes in RSpec: %s" % e)
+        try:
+            leases = rspec.version.get_leases()
+        except Exception, e:
+            self._log.warn("Could not retrieve leases in RSpec: %s" % e)
+        try:
+            links = rspec.version.get_links()
+        except Exception, e:
+            self._log.warn("Could not retrieve links in RSpec: %s" % e)
+        try:
+            channels = rspec.version.get_channels()
+        except Exception, e:
+            self._log.warn("Could not retrieve channels in RSpec: %s" % e)
+  
+        resources = [] 
+        # Extend object and Format object field's name
+        for node in nodes:
+            node['type'] = 'node'
+            node['network_hrn'] = Xrn(node['component_id']).authority[0] # network ? XXX
+            node['hrn'] = urn_to_hrn(node['component_id'])[0]
+            node['urn'] = node['component_id']
+            node['hostname'] = node['component_name']
+            node['initscripts'] = node.pop('pl_initscripts')
+            if 'exclusive' in node and node['exclusive']:
+                node['exclusive'] = node['exclusive'].lower() == 'true'
+            # XXX This should use a MAP as before
+            if 'position' in node: # iotlab
+                node['x'] = node['position']['posx']
+                node['y'] = node['position']['posy']
+                node['z'] = node['position']['posz']
+                del node['position']
+            if 'location' in node:
+                if node['location']:
+                    node['latitude'] = node['location']['latitude']
+                    node['longitude'] = node['location']['longitude']
+                del node['location']
+            # Flatten tags
+            if 'tags' in node:
+                if node['tags']:
+                    for tag in node['tags']:
+                        node[tag['tagname']] = tag['value']
+                del node['tags']
+            
+            # We suppose we have children of dict that cannot be serialized
+            # with xmlrpc, let's make dict
+            resources.append(self.make_dict_rec(node))
+        # NOTE a channel is a resource and should not be treated independently
+        #     resource
+        #        |
+        #   +----+------+-------+
+        #   |    |      |       |
+        # node  link  channel  etc.
+        #resources.extend(nodes)
+        #resources.extend(channels)
+        return {'resource': resources, 'lease': leases } 
+#               'channel': channels \
+#               }
+
+    def build_sfa_rspec(self, slice_id, resources, leases):
+        """
+        Build the XML RSpec from list of resources' urns.
+        eg. resources = ["urn:publicid:IDN+ple:modenaple+node+planetlab-1.ing.unimo.it"]
+        """
+        #if isinstance(resources, str):
+        #    resources = eval(resources)
+        # rspec_type and rspec_version should be set in the config of the platform,
+        # we use GENIv3 as default one if not
+        if self.config:
+            if 'rspec_type' and 'rspec_version' in self.config:
+                rspec_version = self.config['rspec_type'] + ' ' + self.config['rspec_version']
+        else:
+            rspec_version = 'GENI 3'
+
+        # extend rspec version with "content_type"
+        rspec_version += ' request'
+        
+        rspec = RSpec(version=rspec_version)
+
+        nodes = []
+        channels = []
+        links = []
+        self._log.info("Building RSpec for resources %s" % resources)
+        for urn in resources:
+            # XXX TO BE CORRECTED, this handles None values
+            if not urn:
+                continue
+            self._log.info(urn)
+            resource = dict()
+            # TODO: take into account the case where we send a dict of URNs without keys
+            #resource['component_id'] = resource.pop('urn')
+            resource['component_id'] = urn
+            resource_hrn, resource_type = urn_to_hrn(resource['component_id'])
+            # build component_manager_id
+            top_auth = resource_hrn.split('.')[0]
+            cm = urn.split("+")
+            resource['component_manager_id'] = "%s+%s+authority+cm" % (cm[0],top_auth)
+
+            if resource_type == 'node':
+                # XXX dirty hack WiLab !!!
+                if self.config:
+                    if 'wilab2' in self.config['sm']:
+                        resource['client_id'] = "PC"
+                        resource['sliver_type'] = "raw-pc"
+                nodes.append(resource)
+            elif resource_type == 'link':
+                links.append(resource)
+            elif resource_type == 'channel':
+                channels.append(resource)
+            else:
+                raise Exception, "Not supported type of resource" 
+        
+        rspec.version.add_nodes(nodes, rspec_content_type="request")
+        #rspec.version.add_leases(leases)
+        #rspec.version.add_links(links)
+        #rspec.version.add_channels(channels)
+   
+        self._log.info("request rspec: %s"%rspec.toxml())
+        return rspec.toxml()
+
+
index 57050f3..1940552 100644 (file)
@@ -108,3 +108,17 @@ def skipIfNotPythonVersion(func):
 
     return wrapped
 
+def skipIfNotSfaCredentials(func):
+    name = func.__name__
+    def wrapped(*args, **kwargs):
+        sfa_user = os.environ.get("SFA_USER")
+        sfa_pk = os.environ.get("SFA_PK")
+        
+        if not (sfa_user and os.path.exists(os.path.expanduser(sfa_pk))):
+            print "*** WARNING: Skipping test %s: SFA path to private key doesn't exist\n" % name
+            return
+
+        return func(*args, **kwargs)
+
+    return wrapped
+
index e46f667..27b7817 100755 (executable)
@@ -298,7 +298,7 @@ main (void)
         dirpath = tempfile.mkdtemp()
         f = tempfile.NamedTemporaryFile(dir=dirpath, delete=False)
         f.close()
-      
+     
         f1 = tempfile.NamedTemporaryFile(delete=False)
         f1.close()
         f1.name
@@ -310,7 +310,8 @@ main (void)
         node.copy(source, dest)
 
         command = "ls %s" % destdir
-        
+       
+        import pdb;pdb.set_trace() 
         (out, err), proc = node.execute(command)
 
         os.remove(f1.name)
diff --git a/test/resources/planetlab/sfa_node.py b/test/resources/planetlab/sfa_node.py
new file mode 100755 (executable)
index 0000000..cd6e6bb
--- /dev/null
@@ -0,0 +1,171 @@
+#!/usr/bin/env python
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
+
+from nepi.execution.ec import ExperimentController
+
+from nepi.resources.planetlab.sfa_node import PlanetlabSfaNode
+from nepi.util.sfaapi import SFAAPI, SFAAPIFactory
+
+from test_utils import skipIfNotSfaCredentials
+
+import os
+import time
+import unittest
+import multiprocessing
+
+
+class DummyEC(ExperimentController):
+    pass
+
+class PLSfaNodeFactoryTestCase(unittest.TestCase):
+
+    def test_creation_phase(self):
+        self.assertEquals(PlanetlabSfaNode._rtype, "PlanetlabSfaNode")
+        self.assertEquals(len(PlanetlabSfaNode._attributes), 29)
+
+class PLSfaNodeTestCase(unittest.TestCase):
+    """
+    This tests use inria_nepi slice, from the test instance of MyPLC
+    nepiplc.pl.sophia.inria.fr. This test can fail if the user running
+    the test does not have a user in this instance of MyPLC or is not
+    added to the inria_nepi slice.
+    """
+
+    def setUp(self):
+        self.ec = DummyEC()
+        self.username = os.environ.get('SFA_SLICE')
+        self.sfauser = os.environ.get('SFA_USER')
+        self.sfaPrivateKey = os.environ.get('SFA_PK')
+        
+    @skipIfNotSfaCredentials
+    def test_a_sfaapi(self):
+        """
+        Check that the api to discover and reserve resources is well
+        instanciated, and is an instance of SFAAPI. Check that using
+        the same credentials, the same object of the api is used.
+        """
+        node1 = self.ec.register_resource("PlanetlabSfaNode")
+        self.ec.set(node1, "hostname", "planetlab2.ionio.gr")
+        self.ec.set(node1, "username", self.username)
+        self.ec.set(node1, "sfauser", self.sfauser)
+        self.ec.set(node1, "sfaPrivateKey", self.sfaPrivateKey)
+
+        plnode_rm1 = self.ec.get_resource(node1)
+
+        self.assertIsNone(plnode_rm1._node_to_provision)
+
+        api1 = plnode_rm1.sfaapi
+        self.assertIsInstance(api1, SFAAPI)
+        self.assertEquals(len(api1.reserved()), 0)
+        self.assertEquals(len(api1.blacklisted()), 0)
+
+        node2 = self.ec.register_resource("PlanetlabSfaNode")
+        self.ec.set(node2, "hostname", "planetlab2.ionio.gr")
+        self.ec.set(node2, "username", self.username)
+        self.ec.set(node2, "sfauser", self.sfauser)
+        self.ec.set(node2, "sfaPrivateKey", self.sfaPrivateKey)
+
+        plnode_rm2 = self.ec.get_resource(node2)
+        api2 = plnode_rm2.sfaapi
+        self.assertEquals(api1, api2)
+    
+    @skipIfNotSfaCredentials
+    def test_discover(self):
+        """
+        Check that the method do_discover reserve the right node.
+        """
+        node = self.ec.register_resource("PlanetlabSfaNode")
+        self.ec.set(node, "hostname", "planetlab2.ionio.gr")
+        self.ec.set(node, "username", self.username)
+        self.ec.set(node, "sfauser", self.sfauser)
+        self.ec.set(node, "sfaPrivateKey", self.sfaPrivateKey)
+
+        plnode_rm = self.ec.get_resource(node)
+       
+        hostname = plnode_rm.get("hostname")
+        self.assertIsNotNone(hostname)
+
+        self.assertEquals(plnode_rm.sfaapi.reserved(), set())
+
+        plnode_rm.do_discover()
+        self.assertEquals(plnode_rm.sfaapi.reserved().pop(), 'ple.dbislab.planetlab2.ionio.gr')
+        self.assertEquals(plnode_rm._node_to_provision, 'ple.dbislab.planetlab2.ionio.gr')
+
+    @skipIfNotSfaCredentials
+    def test_provision(self):
+        """
+        This test checks that the method do_provision add the node in the slice and check
+        its well functioning.
+        """
+        node = self.ec.register_resource("PlanetlabSfaNode")
+        self.ec.set(node, "hostname", "planetlab2.ionio.gr")
+        self.ec.set(node, "username", self.username)
+        self.ec.set(node, "sfauser", self.sfauser)
+        self.ec.set(node, "sfaPrivateKey", self.sfaPrivateKey)
+
+        plnode_rm = self.ec.get_resource(node)
+
+        self.assertEquals(plnode_rm.sfaapi.reserved(), set())
+        self.assertIsNone(plnode_rm._node_to_provision)
+
+        slicename = 'ple.' + self.username.replace('_', '.')
+
+        plnode_rm.do_discover()
+        plnode_rm.do_provision()    
+
+        cmd = 'echo "IT WORKED"'
+        ((out, err), proc) = plnode_rm.execute(cmd)
+        self.assertEquals(out.strip(), "IT WORKED")
+
+        urn_to_delete = 'urn:publicid:IDN+ple:dbislab+node+planetlab2.ionio.gr'
+        plnode_rm.sfaapi.remove_resource_from_slice(slicename, urn_to_delete)
+
+        slice_resources = plnode_rm.sfaapi.get_slice_resources(slicename)['resource']
+        if slice_resources:
+            slice_resources_hrn = plnode_rm.sfaapi.get_resources_hrn(slice_resources)
+            self.assertNotIn('planetlab2.ionio.gr', slice_resources_hrn.keys())           
+
+    @skipIfNotSfaCredentials
+    def test_xdeploy(self):
+        """
+        Test with the nodes being discover and provision at the same time.
+        The deploy should fail as the test before, there aren't 4 nodes of 
+        that carachteristics.
+        """
+        node = self.ec.register_resource("PlanetlabSfaNode")
+        self.ec.set(node, "hostname", "planetlab2.ionio.gr")
+        self.ec.set(node, "username", self.username)
+        self.ec.set(node, "sfauser", self.sfauser)
+        self.ec.set(node, "sfaPrivateKey", self.sfaPrivateKey)
+
+        self.ec.deploy()
+        self.ec.wait_deployed(node)
+        state = self.ec.state(node)
+        self.assertEquals(state, 3)
+
+    def tearDown(self):
+        self.ec.shutdown()
+
+
+if __name__ == '__main__':
+    unittest.main()
+
+
+