From: Lucia Guevgeozian Odizzio Date: Thu, 22 May 2014 13:44:20 +0000 (+0200) Subject: Planetlab + sfa node changes X-Git-Tag: nepi-3.1.0~77 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=6d8bb1d08981ce3e584d8a9e0276b58927290243;p=nepi.git Planetlab + sfa node changes --- diff --git a/src/nepi/resources/planetlab/node.py b/src/nepi/resources/planetlab/node.py index 1f706a8d..7a8d5a72 100644 --- a/src/nepi/resources/planetlab/node.py +++ b/src/nepi/resources/planetlab/node.py @@ -390,7 +390,7 @@ class PlanetlabNode(LinuxNode): def do_release(self): super(PlanetlabNode, self).do_release() - if self.state == ResourceState.RELEASED: + if self.state == ResourceState.RELEASED and not self._skip_provision(): self.debug(" Releasing PLC API ") self.plapi.release() diff --git a/src/nepi/resources/planetlab/sfa_node.py b/src/nepi/resources/planetlab/sfa_node.py index 9a0684b7..59f1df0a 100644 --- a/src/nepi/resources/planetlab/sfa_node.py +++ b/src/nepi/resources/planetlab/sfa_node.py @@ -237,6 +237,7 @@ class PlanetlabSfaNode(LinuxNode): self.fail_node_not_alive(hostname) else: if self._check_if_in_slice([host_hrn]): + self.debug("The node %s is already in the slice" % hostname) self._slicenode = True self._node_to_provision = host_hrn super(PlanetlabSfaNode, self).do_discover() @@ -278,12 +279,12 @@ class PlanetlabSfaNode(LinuxNode): # def _blacklisted(self, host_hrn): if self.sfaapi.blacklisted(host_hrn): - self.fail_node_not_available(hostname) + self.fail_node_not_available(host_hrn) return False def _reserved(self, host_hrn): if self.sfaapi.reserved(host_hrn): - self.fail_node_not_available(hostname) + self.fail_node_not_available(host_hrn) return False def do_provision(self): @@ -330,7 +331,7 @@ class PlanetlabSfaNode(LinuxNode): # the timeout was reach without establishing ssh connection # the node is blacklisted, deleted from the slice, and a new # node to provision is discovered - self.warn(" Could not SSH login ") + self.warning(" Could not SSH login ") self._blacklist_node(node) self.do_discover() continue @@ -344,7 +345,7 @@ class PlanetlabSfaNode(LinuxNode): ((out2, err2), proc2) = self.execute(cmd) if out1.find("/proc type proc") < 0 or \ "Read-only file system".lower() in err2.lower(): - self.warn(" Corrupted file system ") + self.warning(" Corrupted file system ") self._blacklist_node(node) self.do_discover() continue @@ -529,11 +530,12 @@ class PlanetlabSfaNode(LinuxNode): slicename = 'ple.' + slicename self.sfaapi.add_resource_to_slice(slicename, host_hrn) -# def _delete_node_from_slice(self, node): -# self.warn(" Deleting node from slice ") -# slicename = self.get("username") -# self.plapi.delete_slice_node(slicename, [node]) -# + def _delete_from_slice(self): + self.warning(" Deleting node from slice ") + slicename = self.get("username").replace('_', '.') + slicename = 'ple.' + slicename + self.sfaapi.remove_all_from_slice(slicename) + def _get_hostname(self): hostname = self.get("hostname") if hostname: @@ -559,8 +561,10 @@ class PlanetlabSfaNode(LinuxNode): slicename = self.get("username").replace('_', '.') slicename = 'ple.' + slicename slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource'] - slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes) - nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn.values())) + if slice_nodes: + slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values() + else: slice_nodes_hrn = [] + nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn)) return nodes_inslice def _do_ping(self, hostname): diff --git a/src/nepi/util/sfaapi.py b/src/nepi/util/sfaapi.py index d2888fdb..48dfb485 100644 --- a/src/nepi/util/sfaapi.py +++ b/src/nepi/util/sfaapi.py @@ -39,12 +39,20 @@ class SFAAPI(object): API for quering the SFA service. """ def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec, - timeout): + batch, rtype, timeout): self._blacklist = set() self._reserved = set() self._resources_cache = None self._already_cached = False + self._ec = ec + + if batch: + self._testbed_res = rtype + self._count = 0 + self._total = self._get_total_res() + self._slice_resources_batch = list() + self._log = Logger("SFA API") self.api = Sfi() self.rspec_proc = SfaRSpecProcessing() @@ -73,6 +81,15 @@ class SFAAPI(object): for host in hosts_tobl: self._blacklist.add(host) + def _get_total_res(self): + rms = list() + res_gids = self._ec.resources + for gid in res_gids: + rm = self._ec.get_resource(gid) + if self._testbed_res.lower() in rm._rtype.lower(): + rms.append(rm) + return rms + def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None): """ Execute sfi method. @@ -85,9 +102,10 @@ class SFAAPI(object): if command == 'allocate': args_list = [slicename, rspec] - elif command == 'delete': - args_list = [slicename, urn] - else: args_list = [slicename, '-o', '/tmp/rspec_output'] + else: + args_list = [slicename] + if command != 'delete': + args_list = args_list + ['-o', '/tmp/rspec_output'] elif command == 'resources': args_list = ['-o', '/tmp/rspec_output'] @@ -97,17 +115,23 @@ class SFAAPI(object): self.api.command = command self.api.command_parser = self.api.create_parser_command(self.api.command) (command_options, command_args) = self.api.command_parser.parse_args(args_list) - #print "1 %s" % command_options.info - #command_options.info = "" - #print "2 %s" % command_options.info self.api.command_options = command_options self.api.read_config() self.api.bootstrap() - self.api.dispatch(command, command_options, command_args) - with open("/tmp/rspec_output.rspec", "r") as result_file: - result = result_file.read() - return result + try: + os.remove("/tmp/rspec_output.rspec") + except OSError: + self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist") + + try: + self.api.dispatch(command, command_options, command_args) + with open("/tmp/rspec_output.rspec", "r") as result_file: + result = result_file.read() + return result + except: + self._log.debug(" Couldn't retrive rspec output information from method %s " % command) + return None def get_resources_info(self): """ @@ -147,10 +171,13 @@ class SFAAPI(object): with self.lock_slice: rspec_slice = self._sfi_exec_method('describe', slicename) except: - raise RuntimeError("Fail to describe resource for slice %s" % slicename) + self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename) - result = self.rspec_proc.parse_sfa_rspec(rspec_slice) - return result + if rspec_slice is not None: + result = self.rspec_proc.parse_sfa_rspec(rspec_slice) + return result + else: + return {'resource':[],'lease':[]} def add_resource_to_slice(self, slicename, resource_hrn, leases=None): @@ -163,9 +190,11 @@ class SFAAPI(object): resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:]) resources_hrn_new.append(resource_hrn) - slice_resources = self.get_slice_resources(slicename)['resource'] - with self.lock_slice: + rspec_slice = self._sfi_exec_method('describe', slicename) + if rspec_slice is not None: + slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource'] + else: slice_resources = [] if slice_resources: slice_resources_hrn = self.get_resources_hrn(slice_resources) for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems(): @@ -173,6 +202,7 @@ class SFAAPI(object): s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:]) resources_hrn_new.append(s_hrn) + resources_urn = self._get_resources_urn(resources_hrn_new) rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases) f = open("/tmp/rspec_input.rspec", "w") @@ -183,15 +213,85 @@ class SFAAPI(object): if not os.path.getsize("/tmp/rspec_input.rspec") > 0: raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename) + # ALLOCATE try: - self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec") - except: - raise RuntimeError("Fail to allocate resource for slice %s" % slicename) - try: - self._sfi_exec_method('provision', slicename) + self._log.debug("Allocating resources in slice %s" % slicename) + out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec") except: - raise RuntimeError("Fail to provision resource for slice %s" % slicename) - return True + raise RuntimeError("Fail to allocate resource for slice %s" % slicename) + + if out is not None: + # PROVISION + try: + self._log.debug("Provisioning resources in slice %s" % slicename) + self._sfi_exec_method('provision', slicename) + except: + raise RuntimeError("Fail to provision resource for slice %s" % slicename) + return True + + def add_resource_to_slice_batch(self, slicename, resource_hrn, leases=None): + """ + Method to add all resources together to the slice. Previous deletion of slivers. + """ + # Specially used for wilabt that doesn't allow to add more resources to the slice + # after some resources are added. Every sliver have to be deleted and the batch + # has to be added at once. + self._count += 1 + self._slice_resources_batch.append(resource_hrn) + resources_hrn_new = list() + if self._count == len(self._total): + for resource_hrn in self._slice_resources_batch: + resource_parts = resource_hrn.split('.') + resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:]) + resources_hrn_new.append(resource_hrn) + with self.lock_slice: + self._sfi_exec_method('delete', slicename) + # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt + resources_urn = self._get_urn(resources_hrn_new) + rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases) + + f = open("/tmp/rspec_input.rspec", "w") + f.truncate(0) + f.write(rspec) + f.close() + + if not os.path.getsize("/tmp/rspec_input.rspec") > 0: + raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename) + + # ALLOCATE + try: + self._log.debug("Allocating resources in slice %s" % slicename) + out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec") + except: + raise RuntimeError("Fail to allocate resource for slice %s" % slicename) + + if out is not None: + # PROVISION + try: + self._log.debug("Provisioning resources in slice %s" % slicename) + self._sfi_exec_method('provision', slicename) + except: + raise RuntimeError("Fail to provision resource for slice %s" % slicename) + return True + else: + raise RuntimeError("Fail to allocate resources for slice %s" % slicename) + + else: + self._log.debug(" Waiting for more nodes to add the batch to the slice ") + + def _get_urn(self, resources_hrn): + """ + Get urn from hrn. + """ + resources_urn = list() + for hrn in resources_hrn: + hrn = hrn.replace("\\", "").split('.') + node = hrn.pop() + auth = '.'.join(hrn) + urn = ['urn:publicid:IDN+', auth, '+node+', node] + urn = ''.join(urn) + resources_urn.append(urn) + return resources_urn def remove_resource_from_slice(self, slicename, resource_hrn, leases=None): """ @@ -199,12 +299,23 @@ class SFAAPI(object): and provision method. """ resource_urn = self._get_resources_urn([resource_hrn]).pop() - try: - self._sfi_exec_method('delete', slicename, urn=resource_urn) - except: - raise RuntimeError("Fail to delete resource for slice %s" % slicename) - return True + with self.lock_slice: + try: + self._sfi_exec_method('delete', slicename, urn=resource_urn) + except: + raise RuntimeError("Fail to delete resource for slice %s" % slicename) + return True + def remove_all_from_slice(self, slicename): + """ + De-allocate and de-provision all slivers of the named slice. + """ + with self.lock_slice: + try: + self._sfi_exec_method('delete', slicename) + except: + raise RuntimeError("Fail to delete slivers for slice %s" % slicename) + return True def _get_resources_urn(self, resources_hrn): """ @@ -254,7 +365,7 @@ class SFAAPIFactory(object): @classmethod def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec, - timeout = None): + batch = False, rtype = None, timeout = None): if sfi_user and sfi_sm: key = cls.make_key(sfi_user, sfi_sm) @@ -263,7 +374,7 @@ class SFAAPIFactory(object): if not api: api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, - ec, timeout) + ec, batch, rtype, timeout) cls._apis[key] = api return api