From ae738aa296174e88ddbeace62d1fb9f5eafec24f Mon Sep 17 00:00:00 2001 From: Lucia Guevgeozian Odizzio Date: Thu, 2 May 2013 13:42:47 +0200 Subject: [PATCH] Sfa api and parser for ple and omf --- src/neco/util/sfa_api.py | 263 +++++++++++++++++++++++++++++++++++++ src/neco/util/sfa_sfav1.py | 226 +++++++++++++++++++++++++++++++ 2 files changed, 489 insertions(+) create mode 100644 src/neco/util/sfa_api.py create mode 100644 src/neco/util/sfa_sfav1.py diff --git a/src/neco/util/sfa_api.py b/src/neco/util/sfa_api.py new file mode 100644 index 00000000..d1607b1c --- /dev/null +++ b/src/neco/util/sfa_api.py @@ -0,0 +1,263 @@ +import logging +import hashlib + +from parser import sfa_sfav1 +import subprocess +import warnings + +import threading + +class SFAApi(object): + + def __init__(self, aggregate = 'ple', slice_id = None, sfi_auth = None, sfi_user = None, + sfi_registry = None, sfi_sm = None, timeout = None, private_key = None): + + self._resources = dict() + self._reservable_resources = list() + self._leases = dict() + self._slice_tags = dict() + self._slice_resources = set() + self._slice_leases = set() + self._aggregate = aggregate + self._slice_hrn = slice_id + # TODO: take into account Rspec version, SFA V1, GENI V2, GENI V3 + # For now is SFA V1 from PlanetLab and Nitos (wrong namespace) + self._parser = sfa_sfav1.SFAResourcesParser(['ple', 'omf']) + self._lock = threading.Lock() + + # Paremeters to contact the XMLRPC SFA service + self._sfi_parameters = {'-a': sfi_auth, '-u': sfi_user, + '-r': sfi_registry, '-s': sfi_sm, '-t': timeout, + '-k': private_key} + + #self._logger = logging.getLogger('nepi.utils.sfiapi') + self._fetch_resources_info() + self._fetch_slice_info() + + def _sfi_command_options(self): + command_options = " ".join("%s %s" % (k,v) for (k,v) in \ + self._sfi_parameters.iteritems() if v is not None) + return command_options + + def _sfi_command_exec(self, command): + args = command.split(" ") + s = subprocess.Popen(args, stdout = subprocess.PIPE, + stdin = subprocess.PIPE) + xml, err = s.communicate() + if err: + raise RuntimeError("Command excecution problem, error: %s", err) + return xml + + def _fetch_resources_info(self, resources = True): + command_options = self._sfi_command_options() + command = "sfi.py " + command_options + " resources -l all" + try: + xml = self._sfi_command_exec(command) + except: + #self._logger.error("Error in SFA responds: %s", xml) + raise + if resources: + self._resources, self._leases = self._parser.resources_from_xml(xml, resources = True) + else: + self._leases = self._parser.resources_from_xml(xml) + #self._update_reservable() + return xml + + def _fetch_slice_info(self): + command_options = self._sfi_command_options() + command = "sfi.py " + command_options + " resources -l all" + command = command + " " + self._slice_hrn + try: + xml = self._sfi_command_exec(command) + except: + #self._logger.error("Error in SFA responds: %s", xml) + raise + self._slice_resources, self._slice_leases, self._slice_tags = \ + self._parser.resources_from_xml(xml, sliver = True, resources = True) + return xml + + def _update_reservable(self): + for rid, r in self._resources.iteritems(): + if (r['resource_type'] == 'node' and r['exclusive'].upper() == 'TRUE') \ + or (r['resource_type'] == 'channel'): + self._reservable_resources.append(rid) + + + def discover_resources(self, resourceId=None, fields=[], **kwargs): + result = dict() + resources = self._resources + + if resourceId is not None: + resource_ids = resourceId + if not isinstance(resource_ids, list): + resource_ids = [resource_ids] + resources = self._filter_by_resourceId(resources, resource_ids) + else: + for filter, value in kwargs.items(): + resources = self._filter_by_filter(resources, filter, value) + if not fields: + return resources + else: + for k, info in resources.iteritems(): + info = self._extract_fields(info, fields) + result[k] = info + return result + + def _filter_by_resourceId(self, resources, resource_ids): + return dict((k, resources[k]) for k in resource_ids if k in resources) + + def _filter_by_filter(self, resources, filter, value): + d = dict() + for k in resources.keys(): + if filter in resources[k]: + if resources[k][filter] == value: + d[k] = resources[k] + return d + + def _extract_fields(self, info, fields): + return dict((k, info[k]) for k in fields if k in info) + + def discover_fields(self): + resources = self._resources + fields = [] + for k, data in resources.iteritems(): + for field in data: + if field not in fields: + fields.append(field) + return fields + + def discover_leases(self, resourceId=None): + leases = self._leases + + if resourceId is not None: + resource_ids = resourceId + if not isinstance(resourceId, list): + resource_ids = [resource_ids] + leases = self._filterbyresourceId(leases, resource_ids) + return leases + + def find_resources(self, leases, resources, rtype, quantity, start_time, duration, slot): + result = dict() + if rtype not in ['node', 'channel']: + raise RuntimeError("Unknown type") + + finish_time = start_time + duration * slot + + leases_resources = dict() + reservable_resources = dict() + for lid, lease in leases.iteritems(): + if lease[0]['type'] == rtype: + leases_resources.update({lid: lease}) + #print leases_resources + for rid, resource in resources.iteritems(): + if rtype == 'node' and (resource['type'] == 'node' and resource['exclusive'].upper() == 'TRUE'): + reservable_resources.update({rid: resource}) + elif rtype == 'channel': + reservable_resources.update({rid: resource}) + #if resource['type'] == 'rtype' and resources['exclusive'].upper() == 'TRUE':\ + # (in case adding exclusive tag to channels) + + free_resources = list(set(reservable_resources.keys()) - set(leases_resources.keys())) + + if len(free_resources) >= quantity: + free_resources = free_resources[:quantity] + for rid, resource in resources.iteritems(): + if rid in free_resources: + result[rid] = resource + return result + else: + maybe_free = [] + new_quan = quantity - len(free_resources) + print new_quan + + for lid, lease in leases_resources.iteritems(): + for l in lease: + st = int(l['start_time']) + ft = st + int(l['duration']) * slot + if (st <= finish_time <= ft) or (st <= start_time <= ft): + if lid in maybe_free: + maybe_free.remove(lid) + break + else: + if lid not in maybe_free: + maybe_free.append(lid) + if len(maybe_free) >= new_quan: + free_resources = [free_resources, maybe_free] + free_resources = sum(free_resources, []) + for rid, resource in resources.iteritems(): + if rid in free_resources: + result[rid] = resource + return result + #return free_resources + warnings.warn("There aren't enough nodes") + + + def provision_resource(self, new_resource, start_time = None, duration = None): + import os, tempfile + with self._lock: + xml = self._fetch_slice_info() + new_xml = self._parser.create_reservation_xml(xml, self._slice_hrn,\ + new_resource, start_time, duration, self._aggregate) + fh, fname = tempfile.mkstemp() + print fname + os.write(fh, new_xml) + os.close(fh) + try: + command_options = self._sfi_command_options() + command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname) + out = self._sfi_command_exec(command) + except: + raise + xml = self._fetch_slice_info() + return self._parser.verify_reservation_xml(xml, self._slice_hrn, new_resource, start_time,\ + duration, self._aggregate) + + def release_resource(self, resource, start_time = None, duration = None): + import os, tempfile + with self._lock: + xml = self._fetch_slice_info() + new_xml = self._parser.release_reservation_xml(xml, self._slice_hrn, resource,\ + start_time, duration, self._aggregate) + fh, fname = tempfile.mkstemp() + print fname + os.write(fh, new_xml) + os.close(fh) + try: + command_options = self._sfi_command_options() + command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname) + out = self._sfi_command_exec(command) + except: + raise + xml = self._fetch_slice_info() + return not self._parser.verify_reservation_xml(xml, self._slice_hrn, resource, start_time,\ + duration, self._aggregate) + + +class SFAApiFactory(object): + lock = threading.Lock() + _apis = dict() + + @classmethod + def get_api(slice_id = None, sfi_auth = None, sfi_user = None, + sfi_registry = None, sfi_sm = None, timeout = None, private_key = None): + + key = cls.make_key(slice_id, sfi_auth, sfi_user, sfi_registry, sfi_sm, + timeout, private_key) + api = cls._apis.get(key) + cls.lock.acquire() + api._fetch_resources_info(resources = False) + api._fetch_slice_info() + cls.lock.release() + + if not api: + api = SFAApi(slice_id = None, sfi_auth = None, sfi_user = None, + sfi_registry = None, sfi_sm = None, timeout = None, private_key = None) + cls._apis[key] = api + + return api + + @classmethod + def make_key(cls, *args): + skey = "".join(map(str, args)) + return hashlib.md5(skey).hexdigest() + diff --git a/src/neco/util/sfa_sfav1.py b/src/neco/util/sfa_sfav1.py new file mode 100644 index 00000000..0f545b00 --- /dev/null +++ b/src/neco/util/sfa_sfav1.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- + +from lxml import etree +#import collections +import sys + +class SFAResourcesParser(object): + # Maybe this init method is not necessary, it was aim to check that the + # aggregate was supported by nepi + + def __init__(self, aggr_pattern): + if not isinstance(aggr_pattern, list): + self._aggr_pattern = [aggr_pattern] + else: + self._aggr_pattern = aggr_pattern + + def resources_from_xml(self, xml, sliver = False, resources = False): + rdata = dict() + ldata = dict() + stags = dict() + RSpec = etree.fromstring(xml) + RSpec_attr = dict(RSpec.attrib) + network = RSpec.findall('.//network') + for net in network: + aggr = net.get('name') + if aggr == 'ple' and resources: + node_tree = net.iterfind('node') + for node in list(node_tree): + if isinstance(node.tag, basestring): + data_ple = dict(node.attrib) + data_ple['aggregate'] = aggr + data_ple['resource_type'] = 'node' + data_ple = self._get_node_info(node, data_ple) + hostname = node.find('hostname') + rdata[hostname.text] = data_ple + if sliver: + sliver_defaults = net.find('sliver_defaults') + if len(sliver_defaults): + stags = self._get_sliver_tags(sliver_defaults, stags) + elif aggr == 'omf' and resources: + node_tree = net.iterfind('node') + for node in node_tree: + if isinstance(node.tag, basestring): + data_omf = dict(node.attrib) + data_omf['aggregate'] = aggr + data_omf['resource_type'] = 'node' + data_omf = self._get_node_info(node, data_omf) + hostname = node.find('hostname') + rdata[hostname.text] = data_omf + spectrum = net.find('spectrum') + for channel in list(spectrum): + if isinstance(channel.tag, basestring): + data_omf = dict(channel.attrib) + data_omf['aggregate'] = aggr + data_omf['resource_type'] = 'channel' + channelnum = data_omf['channel_num'] + rdata[channelnum] = data_omf + leases = net.iterfind('lease') + for lease in list(leases): + if isinstance(lease.tag, basestring): + (st, duration) = lease.attrib['start_time'], lease.attrib['duration'] + data_lease = dict(lease.attrib) + data_lease['aggregate'] = aggr + data_lease['resource_type'] = 'lease' + data_lease = self._get_leases_info(lease, data_lease) + ldata[(st, duration)] = data_lease + elif aggr == 'omf' and not resources: + leases = net.iterfind('lease') + for lease in list(leases): + if isinstance(lease.tag, basestring): + (st, duration) = lease.attrib['start_time'], lease.attrib['duration'] + data_lease = dict(lease.attrib) + data_lease['aggregate'] = aggr + data_lease['resource_type'] = 'lease' + data_lease = self._get_leases_info(lease, data_lease) + ldata[(st, duration)] = data_lease + else: + pass + if sliver: + return rdata, ldata, stags + elif resources: + return rdata, ldata + elif not resources: + return ldata + + def _get_node_info(self, node_tag, data_dict): + for n in list(node_tag): + if isinstance(n.tag, basestring): + if n.attrib: + data_dict[n.tag] = dict(n.attrib) + else: + data_dict[n.tag] = n.text + return data_dict + + def _get_leases_info(self, lease_tag, data_dict): + nodes = list() + channels = list() + for l in list(lease_tag): + if l.tag == 'node': + node = l.attrib['component_id'].split('+').pop() + nodes.append(node) + if l.tag == 'channel': + #TODO: find out key when channel reservation + #channels.append(l.attrib['averiguar']) channel_num + pass + data_dict['nodes'] = nodes + data_dict['channels'] = channels + return data_dict + + def _get_sliver_tags(self, sliverdefaults_tag, sliver_tag_dict): + vsys = list() + for info in list(sliverdefaults_tag): + if info.tag == 'vsys_vnet': + sliver_tag_dict['vsys_vnet'] = info.text + elif info.tag == 'vsys': + vsys.append(info.text) + sliver_tag_dict['vsys'] = vsys + return sliver_tag_dict + + def create_reservation_xml(self, xml, slice_hrn, new_resource, start_time, duration, aggregate): + aggrs = [] + RSpec = etree.fromstring(xml) + network = RSpec.findall('.//network') + for net in network: + aggr = net.get('name') + aggrs.append(aggr) + if aggr == aggregate: + new_xml = self._create_tags(RSpec, net, slice_hrn, new_resource, start_time, duration) + if aggregate not in aggrs: + new_net = etree.SubElement(RSpec, 'network', name = aggregate) + new_xml = self._create_tags(RSpec, new_net, slice_hrn, new_resource, start_time, duration) + return new_xml + + def _create_tags(self, RSpec, net, slice_hrn, new_resource, start_time, duration): + resource = new_resource.keys()[0] + res_type = new_resource[resource]['resource_type'] + if res_type == 'node': + node = etree.SubElement(net, res_type, \ + component_manager_id = new_resource[resource]['component_manager_id'],\ + component_id = new_resource[resource]['component_id'],\ + component_name = new_resource[resource]['component_name'], \ + site_id = new_resource[resource]['site_id']) + sliver_tag = etree.SubElement(node, 'sliver') + elif res_type == 'channel': + spectrum = etree.SubElement(net, spectrum) + channel = etree.SubElement(spectrum, channel,\ + channel_num = new_resource[resource]['channel_num'],\ + frequency = new_resource[resource]['frequency'],\ + standard = new_resource[resource]['standard']) + if start_time is not None and duration is not None: + slice_id = "urn:publicid:IDN+" + slice_hrn.split('.')[0] + ':' + slice_hrn.split('.')[1]\ + + '+slice+' + slice_hrn.split('.')[2] + lease = etree.SubElement(net, 'lease', slice_id = slice_id,\ + start_time = str(start_time), duration = str(duration)) + if res_type == 'node': + res = etree.SubElement(lease, res_type,\ + component_id = new_resource[resource]['component_id']) + elif res_type == 'channel': + res = etree.SubElement(lease, res_type,\ + channel_num = new_resource[resource]['channel_num']) + new_xml = etree.tostring(RSpec, xml_declaration=True) + return new_xml + + def verify_reservation_xml(self, xml, slice_hrn, new_resource, start_time, duration, aggregate): + slice_id = "urn:publicid:IDN+" + slice_hrn.split('.')[0] + ':' + slice_hrn.split('.')[1]\ + + '+slice+' + slice_hrn.split('.')[2] + rdata, ldata, stags = self.resources_from_xml(xml, sliver = True, resources = True) + res_name = new_resource.keys()[0] + if res_name in rdata.keys(): + if start_time and duration: + if ldata[(start_time, duration)]: + nodes = ldata[(start_time, duration)]['nodes'] + sliceid = ldata[(start_time, duration)]['slice_id'] + if res_name in nodes and sliceid == slice_id: + return True + else: return False + else: return False + else: return True + else: return False + + def release_reservation_xml(self, xml, slice_hrn, resource, start_time, duration, aggregate): + RSpec = etree.fromstring(xml) + network = RSpec.findall('.//network') + for net in network: + aggr = net.get('name') + if aggr == aggregate: + new_xml = self._delete_tag(RSpec, net, slice_hrn, resource, start_time, duration) + return new_xml + + def _delete_tag(self, RSpec, net, slice_hrn, resource, start_time, duration): + resource_name = resource.keys()[0] + res_type = resource[resource_name]['resource_type'] + if res_type == 'node': + node_tree = net.iterfind('node') + for node in list(node_tree): + if isinstance(node.tag, basestring): + data_node = dict(node.attrib) + if data_node['component_name'] == resource_name: + net.remove(node) + elif res_type == 'channel': + spectrum = net.find('spectrum') + for channel in list(spectrum): + if isinstance(channel.tag, basestring): + data_channel = dict(channel.attrib) + if data_channel['channel_num'] == resource_name: + spectrum.remove(channel) + if start_time is not None and duration is not None: + slice_id = "urn:publicid:IDN+" + slice_hrn.split('.')[0] + ':' + slice_hrn.split('.')[1]\ + + '+slice+' + slice_hrn.split('.')[2] + leases = net.iterfind('lease') + for lease in list(leases): + if isinstance(lease.tag, basestring): + (st, duration) = lease.attrib['start_time'], lease.attrib['duration'] + sliceid = lease.attrib['slice_id'] + if st == str(start_time) and duration == str(duration) and sliceid == slice_id: + for l in list(lease): + if l.tag == 'node' and res_type == 'node': + if l.attrib['component_id'].split('+').pop() == resource_name: + lease.remove(l) + elif l.tag == 'channel' and res_type == 'channel': + if l.attrib['channel_num'] == resource_name: + lease.remove(l) + new_xml = etree.tostring(RSpec, xml_declaration=True) + return new_xml + + -- 2.43.0