Planetlab + sfa node changes
authorLucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
Thu, 22 May 2014 13:44:20 +0000 (15:44 +0200)
committerLucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
Thu, 22 May 2014 13:44:20 +0000 (15:44 +0200)
src/nepi/resources/planetlab/node.py
src/nepi/resources/planetlab/sfa_node.py
src/nepi/util/sfaapi.py

index 1f706a8..7a8d5a7 100644 (file)
@@ -390,7 +390,7 @@ class PlanetlabNode(LinuxNode):
 
     def do_release(self):
         super(PlanetlabNode, self).do_release()
-        if self.state == ResourceState.RELEASED:
+        if self.state == ResourceState.RELEASED and not self._skip_provision():
             self.debug(" Releasing PLC API ")
             self.plapi.release()
 
index 9a0684b..59f1df0 100644 (file)
@@ -237,6 +237,7 @@ class PlanetlabSfaNode(LinuxNode):
                         self.fail_node_not_alive(hostname)
                     else:
                         if self._check_if_in_slice([host_hrn]):
+                            self.debug("The node %s is already in the slice" % hostname)
                             self._slicenode = True
                         self._node_to_provision = host_hrn
                         super(PlanetlabSfaNode, self).do_discover()
@@ -278,12 +279,12 @@ class PlanetlabSfaNode(LinuxNode):
 #    
     def _blacklisted(self, host_hrn):
         if self.sfaapi.blacklisted(host_hrn):
-           self.fail_node_not_available(hostname)
+           self.fail_node_not_available(host_hrn)
         return False
 
     def _reserved(self, host_hrn):
         if self.sfaapi.reserved(host_hrn):
-            self.fail_node_not_available(hostname)
+            self.fail_node_not_available(host_hrn)
         return False
             
     def do_provision(self):
@@ -330,7 +331,7 @@ class PlanetlabSfaNode(LinuxNode):
                 # the timeout was reach without establishing ssh connection
                 # the node is blacklisted, deleted from the slice, and a new
                 # node to provision is discovered
-                self.warn(" Could not SSH login ")
+                self.warning(" Could not SSH login ")
                 self._blacklist_node(node)
                 self.do_discover()
                 continue
@@ -344,7 +345,7 @@ class PlanetlabSfaNode(LinuxNode):
                 ((out2, err2), proc2) = self.execute(cmd)
                 if out1.find("/proc type proc") < 0 or \
                     "Read-only file system".lower() in err2.lower():
-                    self.warn(" Corrupted file system ")
+                    self.warning(" Corrupted file system ")
                     self._blacklist_node(node)
                     self.do_discover()
                     continue
@@ -529,11 +530,12 @@ class PlanetlabSfaNode(LinuxNode):
         slicename = 'ple.' + slicename
         self.sfaapi.add_resource_to_slice(slicename, host_hrn)
 
-#    def _delete_node_from_slice(self, node):
-#        self.warn(" Deleting node from slice ")
-#        slicename = self.get("username")
-#        self.plapi.delete_slice_node(slicename, [node])
-#
+    def _delete_from_slice(self):
+        self.warning(" Deleting node from slice ")
+        slicename = self.get("username").replace('_', '.')
+        slicename = 'ple.' + slicename
+        self.sfaapi.remove_all_from_slice(slicename)
+
     def _get_hostname(self):
         hostname = self.get("hostname")
         if hostname:
@@ -559,8 +561,10 @@ class PlanetlabSfaNode(LinuxNode):
         slicename = self.get("username").replace('_', '.')
         slicename = 'ple.' + slicename
         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
-        slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes)
-        nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn.values()))
+        if slice_nodes:
+            slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
+        else: slice_nodes_hrn = []
+        nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
         return nodes_inslice
 
     def _do_ping(self, hostname):
index d2888fd..48dfb48 100644 (file)
@@ -39,12 +39,20 @@ class SFAAPI(object):
     API for quering the SFA service.
     """
     def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
-        timeout):
+        batch, rtype, timeout):
 
         self._blacklist = set()
         self._reserved = set()
         self._resources_cache = None
         self._already_cached = False
+        self._ec = ec 
+
+        if batch:
+            self._testbed_res = rtype
+            self._count = 0
+            self._total = self._get_total_res()
+            self._slice_resources_batch = list()
+
         self._log = Logger("SFA API")
         self.api = Sfi()
         self.rspec_proc = SfaRSpecProcessing()
@@ -73,6 +81,15 @@ class SFAAPI(object):
                 for host in hosts_tobl:
                     self._blacklist.add(host)
 
+    def _get_total_res(self):
+        rms = list()
+        res_gids = self._ec.resources
+        for gid in res_gids:
+            rm = self._ec.get_resource(gid)
+            if self._testbed_res.lower() in rm._rtype.lower():
+                rms.append(rm)
+        return rms
+
     def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None):
         """
         Execute sfi method.
@@ -85,9 +102,10 @@ class SFAAPI(object):
             
             if command == 'allocate':
                 args_list = [slicename, rspec]
-            elif command == 'delete':
-                args_list = [slicename, urn]
-            else: args_list = [slicename, '-o', '/tmp/rspec_output']
+            else:
+                args_list = [slicename]
+            if command != 'delete':
+                args_list = args_list + ['-o', '/tmp/rspec_output']
 
         elif command == 'resources':
             args_list = ['-o', '/tmp/rspec_output']
@@ -97,17 +115,23 @@ class SFAAPI(object):
         self.api.command = command
         self.api.command_parser = self.api.create_parser_command(self.api.command)
         (command_options, command_args) = self.api.command_parser.parse_args(args_list)
-        #print "1 %s" % command_options.info
-        #command_options.info = ""
-        #print "2 %s" % command_options.info
         self.api.command_options = command_options
         self.api.read_config()
         self.api.bootstrap()
 
-        self.api.dispatch(command, command_options, command_args)
-        with open("/tmp/rspec_output.rspec", "r") as result_file:
-            result = result_file.read()
-        return result
+        try:
+            os.remove("/tmp/rspec_output.rspec")
+        except OSError:
+            self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
+
+        try:
+            self.api.dispatch(command, command_options, command_args)
+            with open("/tmp/rspec_output.rspec", "r") as result_file:
+                result = result_file.read()
+                return result
+        except:
+            self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
+            return None
 
     def get_resources_info(self):
         """
@@ -147,10 +171,13 @@ class SFAAPI(object):
             with self.lock_slice:
                 rspec_slice = self._sfi_exec_method('describe', slicename)
         except:
-            raise RuntimeError("Fail to describe resource for slice %s" % slicename)
+            self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
 
-        result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
-        return result
+        if rspec_slice is not None:
+            result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
+            return result
+        else:
+            return {'resource':[],'lease':[]}
 
 
     def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
@@ -163,9 +190,11 @@ class SFAAPI(object):
         resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
         resources_hrn_new.append(resource_hrn)
 
-        slice_resources = self.get_slice_resources(slicename)['resource']
-
         with self.lock_slice:
+            rspec_slice = self._sfi_exec_method('describe', slicename)
+            if rspec_slice is not None:
+                slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
+            else: slice_resources = []
             if slice_resources:
                 slice_resources_hrn = self.get_resources_hrn(slice_resources)
                 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
@@ -173,6 +202,7 @@ class SFAAPI(object):
                     s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
                     resources_hrn_new.append(s_hrn)
 
+
             resources_urn = self._get_resources_urn(resources_hrn_new)
             rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
             f = open("/tmp/rspec_input.rspec", "w")
@@ -183,15 +213,85 @@ class SFAAPI(object):
             if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
                 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
 
+            # ALLOCATE
             try:
-                self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
-            except:
-                raise RuntimeError("Fail to allocate resource for slice %s" % slicename)            
-            try:
-                self._sfi_exec_method('provision', slicename) 
+                self._log.debug("Allocating resources in slice %s" % slicename)
+                out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
             except:
-                raise RuntimeError("Fail to provision resource for slice %s" % slicename)
-            return True
+                raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
+
+            if out is not None:
+                # PROVISION
+                try:
+                    self._log.debug("Provisioning resources in slice %s" % slicename)
+                    self._sfi_exec_method('provision', slicename) 
+                except:
+                    raise RuntimeError("Fail to provision resource for slice %s" % slicename)
+                return True
+
+    def add_resource_to_slice_batch(self, slicename, resource_hrn, leases=None):
+        """
+        Method to add all resources together to the slice. Previous deletion of slivers.
+        """
+        # Specially used for wilabt that doesn't allow to add more resources to the slice
+        # after some resources are added. Every sliver have to be deleted and the batch 
+        # has to be added at once.
+        self._count += 1
+        self._slice_resources_batch.append(resource_hrn)
+        resources_hrn_new = list()
+        if self._count == len(self._total):
+            for resource_hrn in self._slice_resources_batch:
+                resource_parts = resource_hrn.split('.')
+                resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
+                resources_hrn_new.append(resource_hrn)
+            with self.lock_slice:
+                self._sfi_exec_method('delete', slicename)
+                # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
+                resources_urn = self._get_urn(resources_hrn_new)
+                rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
+
+                f = open("/tmp/rspec_input.rspec", "w")
+                f.truncate(0)
+                f.write(rspec)
+                f.close()
+
+                if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
+                    raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
+
+                # ALLOCATE    
+                try:
+                    self._log.debug("Allocating resources in slice %s" % slicename)
+                    out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
+                except:
+                    raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
+
+                if out is not None:
+                    # PROVISION
+                    try:
+                        self._log.debug("Provisioning resources in slice %s" % slicename)
+                        self._sfi_exec_method('provision', slicename)
+                    except:
+                        raise RuntimeError("Fail to provision resource for slice %s" % slicename)
+                    return True
+                else:
+                    raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
+    
+        else:
+            self._log.debug(" Waiting for more nodes to add the batch to the slice ")
+
+    def _get_urn(self, resources_hrn):
+        """
+        Get urn from hrn.
+        """
+        resources_urn = list()
+        for hrn in resources_hrn:
+            hrn = hrn.replace("\\", "").split('.')
+            node = hrn.pop()
+            auth = '.'.join(hrn)
+            urn = ['urn:publicid:IDN+', auth, '+node+', node]
+            urn = ''.join(urn)
+            resources_urn.append(urn)
+        return resources_urn
 
     def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
         """
@@ -199,12 +299,23 @@ class SFAAPI(object):
         and provision method.
         """
         resource_urn = self._get_resources_urn([resource_hrn]).pop()
-        try:
-            self._sfi_exec_method('delete', slicename, urn=resource_urn)
-        except:
-            raise RuntimeError("Fail to delete resource for slice %s" % slicename)
-        return True
+        with self.lock_slice:
+            try:
+                self._sfi_exec_method('delete', slicename, urn=resource_urn)
+            except:
+                raise RuntimeError("Fail to delete resource for slice %s" % slicename)
+            return True
 
+    def remove_all_from_slice(self, slicename):
+        """
+        De-allocate and de-provision all slivers of the named slice.
+        """
+        with self.lock_slice:
+            try:
+                self._sfi_exec_method('delete', slicename)
+            except:
+                raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
+            return True
 
     def _get_resources_urn(self, resources_hrn):
         """
@@ -254,7 +365,7 @@ class SFAAPIFactory(object):
    
     @classmethod
     def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
-            timeout = None):
+            batch = False, rtype = None, timeout = None):
 
         if sfi_user and sfi_sm:
             key = cls.make_key(sfi_user, sfi_sm)
@@ -263,7 +374,7 @@ class SFAAPIFactory(object):
 
                 if not api:
                     api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
-                        ec, timeout)
+                        ec, batch, rtype, timeout)
                     cls._apis[key] = api
 
                 return api