Sfa api and parser for ple and omf
authorLucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
Thu, 2 May 2013 11:42:47 +0000 (13:42 +0200)
committerLucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
Thu, 2 May 2013 11:42:47 +0000 (13:42 +0200)
src/neco/util/sfa_api.py [new file with mode: 0644]
src/neco/util/sfa_sfav1.py [new file with mode: 0644]

diff --git a/src/neco/util/sfa_api.py b/src/neco/util/sfa_api.py
new file mode 100644 (file)
index 0000000..d1607b1
--- /dev/null
@@ -0,0 +1,263 @@
+import logging
+import hashlib
+from parser import sfa_sfav1
+import subprocess
+import warnings
+import threading
+class SFAApi(object):
+    def __init__(self, aggregate = 'ple', slice_id = None, sfi_auth = None, sfi_user = None,
+            sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
+        self._resources = dict()
+        self._reservable_resources = list()
+        self._leases = dict()
+        self._slice_tags = dict()
+        self._slice_resources = set()
+        self._slice_leases = set()
+        self._aggregate = aggregate
+        self._slice_hrn = slice_id
+        # TODO: take into account Rspec version, SFA V1, GENI V2, GENI V3
+        # For now is SFA V1 from PlanetLab and Nitos (wrong namespace)
+        self._parser = sfa_sfav1.SFAResourcesParser(['ple', 'omf'])
+        self._lock = threading.Lock()
+        # Paremeters to contact the XMLRPC SFA service
+        self._sfi_parameters = {'-a': sfi_auth, '-u': sfi_user,
+                '-r': sfi_registry, '-s': sfi_sm, '-t': timeout,
+                '-k': private_key}
+        #self._logger = logging.getLogger('nepi.utils.sfiapi')
+        self._fetch_resources_info()
+        self._fetch_slice_info()
+    def _sfi_command_options(self):
+        command_options = " ".join("%s %s" % (k,v) for (k,v) in \
+                self._sfi_parameters.iteritems() if v is not None)
+        return command_options
+    def _sfi_command_exec(self, command):
+        args = command.split(" ")
+        s = subprocess.Popen(args, stdout = subprocess.PIPE,
+                stdin = subprocess.PIPE)
+        xml, err = s.communicate()
+        if err:
+           raise RuntimeError("Command excecution problem, error: %s", err)
+        return xml
+    def _fetch_resources_info(self, resources = True):
+        command_options = self._sfi_command_options()
+        command = "sfi.py " + command_options + " resources -l all"
+        try:
+            xml = self._sfi_command_exec(command)
+        except:
+            #self._logger.error("Error in SFA responds: %s", xml)
+            raise
+        if resources:
+            self._resources, self._leases = self._parser.resources_from_xml(xml, resources = True)
+        else:
+            self._leases = self._parser.resources_from_xml(xml)
+        #self._update_reservable()
+        return xml
+    def _fetch_slice_info(self):
+        command_options = self._sfi_command_options()
+        command = "sfi.py " + command_options + " resources -l all"
+        command = command + " " + self._slice_hrn
+        try:
+            xml = self._sfi_command_exec(command)
+        except:
+            #self._logger.error("Error in SFA responds: %s", xml)
+            raise
+        self._slice_resources, self._slice_leases, self._slice_tags = \
+            self._parser.resources_from_xml(xml, sliver = True, resources = True)
+        return xml
+    def _update_reservable(self):
+        for rid, r in self._resources.iteritems():
+            if (r['resource_type'] == 'node' and r['exclusive'].upper() == 'TRUE') \
+                 or (r['resource_type'] == 'channel'):
+                self._reservable_resources.append(rid)
+    def discover_resources(self, resourceId=None, fields=[], **kwargs):
+        result = dict()
+        resources = self._resources
+        if resourceId is not None:
+            resource_ids = resourceId
+            if not isinstance(resource_ids, list):
+                resource_ids = [resource_ids]
+            resources = self._filter_by_resourceId(resources, resource_ids)
+        else:
+            for filter, value in kwargs.items():
+                resources = self._filter_by_filter(resources, filter, value)
+        if not fields:
+            return resources
+        else:
+            for k, info in resources.iteritems():
+                info = self._extract_fields(info, fields)
+                result[k] = info
+            return result
+    def _filter_by_resourceId(self, resources, resource_ids):
+        return dict((k, resources[k]) for k in resource_ids if k in resources)
+    def _filter_by_filter(self, resources, filter, value):
+        d = dict()
+        for k in resources.keys():
+            if filter in resources[k]:
+                if resources[k][filter] == value:
+                    d[k] = resources[k]
+        return d
+    def _extract_fields(self, info, fields):
+        return dict((k, info[k]) for k in fields if k in info)
+    def discover_fields(self):
+        resources = self._resources
+        fields = []
+        for k, data in resources.iteritems():
+            for field in data:
+                if field not in fields:
+                    fields.append(field)
+        return fields
+    def discover_leases(self, resourceId=None):
+        leases = self._leases
+        if resourceId is not None:
+            resource_ids = resourceId
+            if not isinstance(resourceId, list):
+                resource_ids = [resource_ids]
+            leases = self._filterbyresourceId(leases, resource_ids)
+        return leases
+    def find_resources(self, leases, resources, rtype, quantity, start_time, duration, slot):
+        result = dict()
+        if rtype not in ['node', 'channel']:
+            raise RuntimeError("Unknown type")
+        finish_time = start_time + duration * slot
+        leases_resources = dict()
+        reservable_resources = dict()
+        for lid, lease in leases.iteritems():
+            if lease[0]['type'] == rtype:
+                leases_resources.update({lid: lease})
+        #print leases_resources
+        for rid, resource in resources.iteritems():
+            if rtype == 'node' and (resource['type'] == 'node' and resource['exclusive'].upper() == 'TRUE'):
+                reservable_resources.update({rid: resource})
+            elif rtype == 'channel':
+                reservable_resources.update({rid: resource})
+            #if resource['type'] == 'rtype' and resources['exclusive'].upper() == 'TRUE':\
+            # (in case adding exclusive tag to channels)
+        free_resources = list(set(reservable_resources.keys()) - set(leases_resources.keys()))
+        if len(free_resources) >= quantity:
+            free_resources = free_resources[:quantity]
+            for rid, resource in resources.iteritems():
+                if rid in free_resources:
+                    result[rid] = resource
+            return result
+        else:
+            maybe_free = []
+            new_quan = quantity - len(free_resources)
+            print new_quan
+            for lid, lease in leases_resources.iteritems():
+                for l in lease:
+                    st = int(l['start_time'])
+                    ft = st + int(l['duration']) * slot
+                    if (st <= finish_time <= ft) or (st <= start_time <= ft):
+                        if lid in maybe_free:
+                            maybe_free.remove(lid)
+                        break
+                    else:
+                        if lid not in maybe_free:
+                            maybe_free.append(lid)
+                if len(maybe_free) >= new_quan:
+                    free_resources = [free_resources, maybe_free]
+                    free_resources = sum(free_resources, [])
+                    for rid, resource in resources.iteritems():
+                        if rid in free_resources:
+                            result[rid] = resource
+                        return result
+                    #return free_resources
+            warnings.warn("There aren't enough nodes")
+    def provision_resource(self, new_resource, start_time = None, duration = None):
+        import os, tempfile
+        with self._lock:
+            xml = self._fetch_slice_info()
+            new_xml = self._parser.create_reservation_xml(xml, self._slice_hrn,\
+            new_resource, start_time, duration, self._aggregate)
+            fh, fname = tempfile.mkstemp()
+            print fname
+            os.write(fh, new_xml)
+            os.close(fh)
+            try:
+                command_options = self._sfi_command_options()
+                command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
+                out = self._sfi_command_exec(command)
+            except:
+                raise
+        xml = self._fetch_slice_info()
+        return self._parser.verify_reservation_xml(xml, self._slice_hrn, new_resource, start_time,\
+                duration, self._aggregate)
+    def release_resource(self, resource, start_time = None, duration = None):
+        import os, tempfile
+        with self._lock:
+            xml = self._fetch_slice_info()
+            new_xml = self._parser.release_reservation_xml(xml, self._slice_hrn, resource,\
+            start_time, duration, self._aggregate)
+            fh, fname = tempfile.mkstemp()
+            print fname
+            os.write(fh, new_xml)
+            os.close(fh)
+            try:
+                command_options = self._sfi_command_options()
+                command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
+                out = self._sfi_command_exec(command)
+            except:
+                raise
+        xml = self._fetch_slice_info()
+        return not self._parser.verify_reservation_xml(xml, self._slice_hrn, resource, start_time,\
+            duration, self._aggregate)
+class SFAApiFactory(object):
+    lock = threading.Lock()
+    _apis = dict()
+    @classmethod
+    def get_api(slice_id = None, sfi_auth = None, sfi_user = None,
+            sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
+        key = cls.make_key(slice_id, sfi_auth, sfi_user, sfi_registry, sfi_sm,
+            timeout, private_key)
+        api = cls._apis.get(key)
+        cls.lock.acquire()
+        api._fetch_resources_info(resources = False)
+        api._fetch_slice_info()
+        cls.lock.release()
+        if not api:
+            api = SFAApi(slice_id = None, sfi_auth = None, sfi_user = None,
+            sfi_registry = None, sfi_sm = None, timeout = None, private_key = None)
+            cls._apis[key] = api
+        return api
+    @classmethod
+    def make_key(cls, *args):
+        skey = "".join(map(str, args))
+        return hashlib.md5(skey).hexdigest()
diff --git a/src/neco/util/sfa_sfav1.py b/src/neco/util/sfa_sfav1.py
new file mode 100644 (file)
index 0000000..0f545b0
--- /dev/null
@@ -0,0 +1,226 @@
+# -*- coding: utf-8 -*-
+from lxml import etree
+#import collections
+import sys
+class SFAResourcesParser(object):
+    # Maybe this init method is not necessary, it was aim to check that the
+    # aggregate was supported by nepi
+    def __init__(self, aggr_pattern):
+        if not isinstance(aggr_pattern, list):
+            self._aggr_pattern = [aggr_pattern]
+        else:
+            self._aggr_pattern = aggr_pattern
+    def resources_from_xml(self, xml, sliver = False, resources = False):
+        rdata = dict()
+        ldata = dict()
+        stags = dict()
+        RSpec = etree.fromstring(xml)
+        RSpec_attr = dict(RSpec.attrib)
+        network = RSpec.findall('.//network')
+        for net in network:
+            aggr = net.get('name') 
+            if aggr == 'ple' and resources:
+                node_tree = net.iterfind('node')
+                for node in list(node_tree): 
+                    if isinstance(node.tag, basestring):
+                        data_ple = dict(node.attrib)
+                        data_ple['aggregate'] = aggr
+                        data_ple['resource_type'] = 'node'
+                        data_ple = self._get_node_info(node, data_ple)
+                        hostname = node.find('hostname')
+                        rdata[hostname.text] = data_ple
+                if sliver:
+                    sliver_defaults = net.find('sliver_defaults')
+                    if len(sliver_defaults):
+                        stags = self._get_sliver_tags(sliver_defaults, stags)
+            elif aggr == 'omf' and resources:
+                node_tree = net.iterfind('node')
+                for node in node_tree:
+                    if isinstance(node.tag, basestring):
+                        data_omf = dict(node.attrib)
+                        data_omf['aggregate'] = aggr
+                        data_omf['resource_type'] = 'node'
+                        data_omf = self._get_node_info(node, data_omf)
+                        hostname = node.find('hostname')
+                        rdata[hostname.text] = data_omf
+                spectrum = net.find('spectrum')
+                for channel in list(spectrum):
+                    if isinstance(channel.tag, basestring):
+                        data_omf = dict(channel.attrib)
+                        data_omf['aggregate'] = aggr
+                        data_omf['resource_type'] = 'channel'
+                        channelnum = data_omf['channel_num']
+                        rdata[channelnum] = data_omf
+                leases = net.iterfind('lease')
+                for lease in list(leases):
+                    if isinstance(lease.tag, basestring):
+                        (st, duration) = lease.attrib['start_time'], lease.attrib['duration']
+                        data_lease = dict(lease.attrib)
+                        data_lease['aggregate'] = aggr
+                        data_lease['resource_type'] = 'lease'
+                        data_lease = self._get_leases_info(lease, data_lease)
+                        ldata[(st, duration)] = data_lease
+            elif aggr == 'omf' and not resources:
+                leases = net.iterfind('lease')
+                for lease in list(leases):
+                    if isinstance(lease.tag, basestring):
+                        (st, duration) = lease.attrib['start_time'], lease.attrib['duration']
+                        data_lease = dict(lease.attrib)
+                        data_lease['aggregate'] = aggr
+                        data_lease['resource_type'] = 'lease'
+                        data_lease = self._get_leases_info(lease, data_lease)
+                        ldata[(st, duration)] = data_lease
+            else:
+                pass
+        if sliver:
+            return rdata, ldata, stags
+        elif resources:
+            return rdata, ldata
+        elif not resources:
+            return ldata
+    def _get_node_info(self, node_tag, data_dict):
+        for n in list(node_tag):
+            if isinstance(n.tag, basestring):
+                if n.attrib:
+                    data_dict[n.tag] = dict(n.attrib)
+                else:
+                    data_dict[n.tag] = n.text
+        return data_dict
+    def _get_leases_info(self, lease_tag, data_dict):
+        nodes = list()
+        channels = list()
+        for l in list(lease_tag):
+            if l.tag == 'node':
+                node = l.attrib['component_id'].split('+').pop()
+                nodes.append(node)
+            if l.tag == 'channel':
+                #TODO: find out key when channel reservation
+                #channels.append(l.attrib['averiguar']) channel_num
+                pass
+            data_dict['nodes'] = nodes
+            data_dict['channels'] = channels
+        return data_dict
+    def _get_sliver_tags(self, sliverdefaults_tag, sliver_tag_dict):
+        vsys = list()
+        for info in list(sliverdefaults_tag):
+            if info.tag == 'vsys_vnet':
+                sliver_tag_dict['vsys_vnet'] = info.text
+            elif info.tag == 'vsys':
+                vsys.append(info.text)
+        sliver_tag_dict['vsys'] = vsys
+        return sliver_tag_dict
+    def create_reservation_xml(self, xml, slice_hrn, new_resource, start_time, duration, aggregate):
+        aggrs = []
+        RSpec = etree.fromstring(xml)
+        network = RSpec.findall('.//network')
+        for net in network:
+            aggr = net.get('name')
+            aggrs.append(aggr)
+            if aggr == aggregate:
+                new_xml = self._create_tags(RSpec, net, slice_hrn, new_resource, start_time, duration)
+        if aggregate not in aggrs:
+            new_net = etree.SubElement(RSpec, 'network', name = aggregate)
+            new_xml = self._create_tags(RSpec, new_net, slice_hrn, new_resource, start_time, duration)
+        return new_xml
+    def _create_tags(self, RSpec, net, slice_hrn, new_resource, start_time, duration):
+        resource = new_resource.keys()[0]
+        res_type = new_resource[resource]['resource_type']
+        if res_type == 'node':
+            node = etree.SubElement(net, res_type, \
+            component_manager_id = new_resource[resource]['component_manager_id'],\
+            component_id = new_resource[resource]['component_id'],\
+            component_name = new_resource[resource]['component_name'], \
+            site_id = new_resource[resource]['site_id'])
+            sliver_tag = etree.SubElement(node, 'sliver')
+        elif res_type == 'channel':
+            spectrum = etree.SubElement(net, spectrum)
+            channel = etree.SubElement(spectrum, channel,\
+            channel_num = new_resource[resource]['channel_num'],\
+            frequency = new_resource[resource]['frequency'],\
+            standard = new_resource[resource]['standard'])
+        if start_time is not None and duration is not None:
+            slice_id = "urn:publicid:IDN+" + slice_hrn.split('.')[0] + ':' + slice_hrn.split('.')[1]\
+            + '+slice+' + slice_hrn.split('.')[2]
+            lease = etree.SubElement(net, 'lease', slice_id = slice_id,\
+            start_time = str(start_time), duration = str(duration))
+            if res_type == 'node':
+                res = etree.SubElement(lease, res_type,\
+                component_id = new_resource[resource]['component_id'])
+            elif res_type == 'channel':
+                res = etree.SubElement(lease, res_type,\
+                channel_num = new_resource[resource]['channel_num'])
+        new_xml = etree.tostring(RSpec, xml_declaration=True)
+        return new_xml
+    def verify_reservation_xml(self, xml, slice_hrn, new_resource, start_time, duration, aggregate):
+        slice_id = "urn:publicid:IDN+" + slice_hrn.split('.')[0] + ':' + slice_hrn.split('.')[1]\
+        + '+slice+' + slice_hrn.split('.')[2]
+        rdata, ldata, stags = self.resources_from_xml(xml, sliver = True, resources = True)
+        res_name = new_resource.keys()[0]
+        if res_name in rdata.keys():
+            if start_time and duration:
+                if ldata[(start_time, duration)]:
+                    nodes = ldata[(start_time, duration)]['nodes']
+                    sliceid = ldata[(start_time, duration)]['slice_id']
+                    if res_name in nodes and sliceid == slice_id:
+                        return True
+                    else: return False
+                else: return False
+            else: return True
+        else: return False
+    def release_reservation_xml(self, xml, slice_hrn, resource, start_time, duration, aggregate):
+        RSpec = etree.fromstring(xml)
+        network = RSpec.findall('.//network')
+        for net in network:
+            aggr = net.get('name')
+            if aggr == aggregate:
+                new_xml = self._delete_tag(RSpec, net, slice_hrn, resource, start_time, duration)
+                return new_xml
+    def _delete_tag(self, RSpec, net, slice_hrn, resource, start_time, duration):
+        resource_name = resource.keys()[0]
+        res_type = resource[resource_name]['resource_type']
+        if res_type == 'node':
+            node_tree = net.iterfind('node')
+            for node in list(node_tree):
+                if isinstance(node.tag, basestring):
+                    data_node = dict(node.attrib)
+                    if data_node['component_name'] == resource_name:
+                        net.remove(node)
+        elif res_type == 'channel':
+            spectrum = net.find('spectrum')
+            for channel in list(spectrum):
+                if isinstance(channel.tag, basestring):
+                    data_channel = dict(channel.attrib)
+                    if data_channel['channel_num'] == resource_name:
+                        spectrum.remove(channel)
+        if start_time is not None and duration is not None:
+            slice_id = "urn:publicid:IDN+" + slice_hrn.split('.')[0] + ':' + slice_hrn.split('.')[1]\
+            + '+slice+' + slice_hrn.split('.')[2]
+            leases = net.iterfind('lease')
+            for lease in list(leases):
+                if isinstance(lease.tag, basestring):
+                    (st, duration) = lease.attrib['start_time'], lease.attrib['duration']
+                    sliceid = lease.attrib['slice_id']
+                    if st == str(start_time) and duration == str(duration) and sliceid == slice_id:
+                        for l in list(lease):
+                            if l.tag == 'node' and res_type == 'node':
+                                if l.attrib['component_id'].split('+').pop() == resource_name:
+                                    lease.remove(l)
+                            elif l.tag == 'channel' and res_type == 'channel':
+                                if l.attrib['channel_num'] == resource_name:
+                                    lease.remove(l)
+        new_xml = etree.tostring(RSpec, xml_declaration=True)
+        return new_xml