--- /dev/null
+import logging
+import hashlib
+
+from parser import sfa_sfav1
+import subprocess
+import warnings
+
+import threading
+
+class SFAApi(object):
+
+ def __init__(self, aggregate = 'ple', slice_id = None, sfi_auth = None, sfi_user = None,
+ sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
+
+ self._resources = dict()
+ self._reservable_resources = list()
+ self._leases = dict()
+ self._slice_tags = dict()
+ self._slice_resources = set()
+ self._slice_leases = set()
+ self._aggregate = aggregate
+ self._slice_hrn = slice_id
+ # TODO: take into account Rspec version, SFA V1, GENI V2, GENI V3
+ # For now is SFA V1 from PlanetLab and Nitos (wrong namespace)
+ self._parser = sfa_sfav1.SFAResourcesParser(['ple', 'omf'])
+ self._lock = threading.Lock()
+
+ # Paremeters to contact the XMLRPC SFA service
+ self._sfi_parameters = {'-a': sfi_auth, '-u': sfi_user,
+ '-r': sfi_registry, '-s': sfi_sm, '-t': timeout,
+ '-k': private_key}
+
+ #self._logger = logging.getLogger('nepi.utils.sfiapi')
+ self._fetch_resources_info()
+ self._fetch_slice_info()
+
+ def _sfi_command_options(self):
+ command_options = " ".join("%s %s" % (k,v) for (k,v) in \
+ self._sfi_parameters.iteritems() if v is not None)
+ return command_options
+
+ def _sfi_command_exec(self, command):
+ args = command.split(" ")
+ s = subprocess.Popen(args, stdout = subprocess.PIPE,
+ stdin = subprocess.PIPE)
+ xml, err = s.communicate()
+ if err:
+ raise RuntimeError("Command excecution problem, error: %s", err)
+ return xml
+
+ def _fetch_resources_info(self, resources = True):
+ command_options = self._sfi_command_options()
+ command = "sfi.py " + command_options + " resources -l all"
+ try:
+ xml = self._sfi_command_exec(command)
+ except:
+ #self._logger.error("Error in SFA responds: %s", xml)
+ raise
+ if resources:
+ self._resources, self._leases = self._parser.resources_from_xml(xml, resources = True)
+ else:
+ self._leases = self._parser.resources_from_xml(xml)
+ #self._update_reservable()
+ return xml
+
+ def _fetch_slice_info(self):
+ command_options = self._sfi_command_options()
+ command = "sfi.py " + command_options + " resources -l all"
+ command = command + " " + self._slice_hrn
+ try:
+ xml = self._sfi_command_exec(command)
+ except:
+ #self._logger.error("Error in SFA responds: %s", xml)
+ raise
+ self._slice_resources, self._slice_leases, self._slice_tags = \
+ self._parser.resources_from_xml(xml, sliver = True, resources = True)
+ return xml
+
+ def _update_reservable(self):
+ for rid, r in self._resources.iteritems():
+ if (r['resource_type'] == 'node' and r['exclusive'].upper() == 'TRUE') \
+ or (r['resource_type'] == 'channel'):
+ self._reservable_resources.append(rid)
+
+
+ def discover_resources(self, resourceId=None, fields=[], **kwargs):
+ result = dict()
+ resources = self._resources
+
+ if resourceId is not None:
+ resource_ids = resourceId
+ if not isinstance(resource_ids, list):
+ resource_ids = [resource_ids]
+ resources = self._filter_by_resourceId(resources, resource_ids)
+ else:
+ for filter, value in kwargs.items():
+ resources = self._filter_by_filter(resources, filter, value)
+ if not fields:
+ return resources
+ else:
+ for k, info in resources.iteritems():
+ info = self._extract_fields(info, fields)
+ result[k] = info
+ return result
+
+ def _filter_by_resourceId(self, resources, resource_ids):
+ return dict((k, resources[k]) for k in resource_ids if k in resources)
+
+ def _filter_by_filter(self, resources, filter, value):
+ d = dict()
+ for k in resources.keys():
+ if filter in resources[k]:
+ if resources[k][filter] == value:
+ d[k] = resources[k]
+ return d
+
+ def _extract_fields(self, info, fields):
+ return dict((k, info[k]) for k in fields if k in info)
+
+ def discover_fields(self):
+ resources = self._resources
+ fields = []
+ for k, data in resources.iteritems():
+ for field in data:
+ if field not in fields:
+ fields.append(field)
+ return fields
+
+ def discover_leases(self, resourceId=None):
+ leases = self._leases
+
+ if resourceId is not None:
+ resource_ids = resourceId
+ if not isinstance(resourceId, list):
+ resource_ids = [resource_ids]
+ leases = self._filterbyresourceId(leases, resource_ids)
+ return leases
+
+ def find_resources(self, leases, resources, rtype, quantity, start_time, duration, slot):
+ result = dict()
+ if rtype not in ['node', 'channel']:
+ raise RuntimeError("Unknown type")
+
+ finish_time = start_time + duration * slot
+
+ leases_resources = dict()
+ reservable_resources = dict()
+ for lid, lease in leases.iteritems():
+ if lease[0]['type'] == rtype:
+ leases_resources.update({lid: lease})
+ #print leases_resources
+ for rid, resource in resources.iteritems():
+ if rtype == 'node' and (resource['type'] == 'node' and resource['exclusive'].upper() == 'TRUE'):
+ reservable_resources.update({rid: resource})
+ elif rtype == 'channel':
+ reservable_resources.update({rid: resource})
+ #if resource['type'] == 'rtype' and resources['exclusive'].upper() == 'TRUE':\
+ # (in case adding exclusive tag to channels)
+
+ free_resources = list(set(reservable_resources.keys()) - set(leases_resources.keys()))
+
+ if len(free_resources) >= quantity:
+ free_resources = free_resources[:quantity]
+ for rid, resource in resources.iteritems():
+ if rid in free_resources:
+ result[rid] = resource
+ return result
+ else:
+ maybe_free = []
+ new_quan = quantity - len(free_resources)
+ print new_quan
+
+ for lid, lease in leases_resources.iteritems():
+ for l in lease:
+ st = int(l['start_time'])
+ ft = st + int(l['duration']) * slot
+ if (st <= finish_time <= ft) or (st <= start_time <= ft):
+ if lid in maybe_free:
+ maybe_free.remove(lid)
+ break
+ else:
+ if lid not in maybe_free:
+ maybe_free.append(lid)
+ if len(maybe_free) >= new_quan:
+ free_resources = [free_resources, maybe_free]
+ free_resources = sum(free_resources, [])
+ for rid, resource in resources.iteritems():
+ if rid in free_resources:
+ result[rid] = resource
+ return result
+ #return free_resources
+ warnings.warn("There aren't enough nodes")
+
+
+ def provision_resource(self, new_resource, start_time = None, duration = None):
+ import os, tempfile
+ with self._lock:
+ xml = self._fetch_slice_info()
+ new_xml = self._parser.create_reservation_xml(xml, self._slice_hrn,\
+ new_resource, start_time, duration, self._aggregate)
+ fh, fname = tempfile.mkstemp()
+ print fname
+ os.write(fh, new_xml)
+ os.close(fh)
+ try:
+ command_options = self._sfi_command_options()
+ command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
+ out = self._sfi_command_exec(command)
+ except:
+ raise
+ xml = self._fetch_slice_info()
+ return self._parser.verify_reservation_xml(xml, self._slice_hrn, new_resource, start_time,\
+ duration, self._aggregate)
+
+ def release_resource(self, resource, start_time = None, duration = None):
+ import os, tempfile
+ with self._lock:
+ xml = self._fetch_slice_info()
+ new_xml = self._parser.release_reservation_xml(xml, self._slice_hrn, resource,\
+ start_time, duration, self._aggregate)
+ fh, fname = tempfile.mkstemp()
+ print fname
+ os.write(fh, new_xml)
+ os.close(fh)
+ try:
+ command_options = self._sfi_command_options()
+ command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
+ out = self._sfi_command_exec(command)
+ except:
+ raise
+ xml = self._fetch_slice_info()
+ return not self._parser.verify_reservation_xml(xml, self._slice_hrn, resource, start_time,\
+ duration, self._aggregate)
+
+
+class SFAApiFactory(object):
+ lock = threading.Lock()
+ _apis = dict()
+
+ @classmethod
+ def get_api(slice_id = None, sfi_auth = None, sfi_user = None,
+ sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
+
+ key = cls.make_key(slice_id, sfi_auth, sfi_user, sfi_registry, sfi_sm,
+ timeout, private_key)
+ api = cls._apis.get(key)
+ cls.lock.acquire()
+ api._fetch_resources_info(resources = False)
+ api._fetch_slice_info()
+ cls.lock.release()
+
+ if not api:
+ api = SFAApi(slice_id = None, sfi_auth = None, sfi_user = None,
+ sfi_registry = None, sfi_sm = None, timeout = None, private_key = None)
+ cls._apis[key] = api
+
+ return api
+
+ @classmethod
+ def make_key(cls, *args):
+ skey = "".join(map(str, args))
+ return hashlib.md5(skey).hexdigest()
+
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from lxml import etree
+#import collections
+import sys
+
+class SFAResourcesParser(object):
+ # Maybe this init method is not necessary, it was aim to check that the
+ # aggregate was supported by nepi
+
+ def __init__(self, aggr_pattern):
+ if not isinstance(aggr_pattern, list):
+ self._aggr_pattern = [aggr_pattern]
+ else:
+ self._aggr_pattern = aggr_pattern
+
+ def resources_from_xml(self, xml, sliver = False, resources = False):
+ rdata = dict()
+ ldata = dict()
+ stags = dict()
+ RSpec = etree.fromstring(xml)
+ RSpec_attr = dict(RSpec.attrib)
+ network = RSpec.findall('.//network')
+ for net in network:
+ aggr = net.get('name')
+ if aggr == 'ple' and resources:
+ node_tree = net.iterfind('node')
+ for node in list(node_tree):
+ if isinstance(node.tag, basestring):
+ data_ple = dict(node.attrib)
+ data_ple['aggregate'] = aggr
+ data_ple['resource_type'] = 'node'
+ data_ple = self._get_node_info(node, data_ple)
+ hostname = node.find('hostname')
+ rdata[hostname.text] = data_ple
+ if sliver:
+ sliver_defaults = net.find('sliver_defaults')
+ if len(sliver_defaults):
+ stags = self._get_sliver_tags(sliver_defaults, stags)
+ elif aggr == 'omf' and resources:
+ node_tree = net.iterfind('node')
+ for node in node_tree:
+ if isinstance(node.tag, basestring):
+ data_omf = dict(node.attrib)
+ data_omf['aggregate'] = aggr
+ data_omf['resource_type'] = 'node'
+ data_omf = self._get_node_info(node, data_omf)
+ hostname = node.find('hostname')
+ rdata[hostname.text] = data_omf
+ spectrum = net.find('spectrum')
+ for channel in list(spectrum):
+ if isinstance(channel.tag, basestring):
+ data_omf = dict(channel.attrib)
+ data_omf['aggregate'] = aggr
+ data_omf['resource_type'] = 'channel'
+ channelnum = data_omf['channel_num']
+ rdata[channelnum] = data_omf
+ leases = net.iterfind('lease')
+ for lease in list(leases):
+ if isinstance(lease.tag, basestring):
+ (st, duration) = lease.attrib['start_time'], lease.attrib['duration']
+ data_lease = dict(lease.attrib)
+ data_lease['aggregate'] = aggr
+ data_lease['resource_type'] = 'lease'
+ data_lease = self._get_leases_info(lease, data_lease)
+ ldata[(st, duration)] = data_lease
+ elif aggr == 'omf' and not resources:
+ leases = net.iterfind('lease')
+ for lease in list(leases):
+ if isinstance(lease.tag, basestring):
+ (st, duration) = lease.attrib['start_time'], lease.attrib['duration']
+ data_lease = dict(lease.attrib)
+ data_lease['aggregate'] = aggr
+ data_lease['resource_type'] = 'lease'
+ data_lease = self._get_leases_info(lease, data_lease)
+ ldata[(st, duration)] = data_lease
+ else:
+ pass
+ if sliver:
+ return rdata, ldata, stags
+ elif resources:
+ return rdata, ldata
+ elif not resources:
+ return ldata
+
+ def _get_node_info(self, node_tag, data_dict):
+ for n in list(node_tag):
+ if isinstance(n.tag, basestring):
+ if n.attrib:
+ data_dict[n.tag] = dict(n.attrib)
+ else:
+ data_dict[n.tag] = n.text
+ return data_dict
+
+ def _get_leases_info(self, lease_tag, data_dict):
+ nodes = list()
+ channels = list()
+ for l in list(lease_tag):
+ if l.tag == 'node':
+ node = l.attrib['component_id'].split('+').pop()
+ nodes.append(node)
+ if l.tag == 'channel':
+ #TODO: find out key when channel reservation
+ #channels.append(l.attrib['averiguar']) channel_num
+ pass
+ data_dict['nodes'] = nodes
+ data_dict['channels'] = channels
+ return data_dict
+
+ def _get_sliver_tags(self, sliverdefaults_tag, sliver_tag_dict):
+ vsys = list()
+ for info in list(sliverdefaults_tag):
+ if info.tag == 'vsys_vnet':
+ sliver_tag_dict['vsys_vnet'] = info.text
+ elif info.tag == 'vsys':
+ vsys.append(info.text)
+ sliver_tag_dict['vsys'] = vsys
+ return sliver_tag_dict
+
+ def create_reservation_xml(self, xml, slice_hrn, new_resource, start_time, duration, aggregate):
+ aggrs = []
+ RSpec = etree.fromstring(xml)
+ network = RSpec.findall('.//network')
+ for net in network:
+ aggr = net.get('name')
+ aggrs.append(aggr)
+ if aggr == aggregate:
+ new_xml = self._create_tags(RSpec, net, slice_hrn, new_resource, start_time, duration)
+ if aggregate not in aggrs:
+ new_net = etree.SubElement(RSpec, 'network', name = aggregate)
+ new_xml = self._create_tags(RSpec, new_net, slice_hrn, new_resource, start_time, duration)
+ return new_xml
+
+ def _create_tags(self, RSpec, net, slice_hrn, new_resource, start_time, duration):
+ resource = new_resource.keys()[0]
+ res_type = new_resource[resource]['resource_type']
+ if res_type == 'node':
+ node = etree.SubElement(net, res_type, \
+ component_manager_id = new_resource[resource]['component_manager_id'],\
+ component_id = new_resource[resource]['component_id'],\
+ component_name = new_resource[resource]['component_name'], \
+ site_id = new_resource[resource]['site_id'])
+ sliver_tag = etree.SubElement(node, 'sliver')
+ elif res_type == 'channel':
+ spectrum = etree.SubElement(net, spectrum)
+ channel = etree.SubElement(spectrum, channel,\
+ channel_num = new_resource[resource]['channel_num'],\
+ frequency = new_resource[resource]['frequency'],\
+ standard = new_resource[resource]['standard'])
+ if start_time is not None and duration is not None:
+ slice_id = "urn:publicid:IDN+" + slice_hrn.split('.')[0] + ':' + slice_hrn.split('.')[1]\
+ + '+slice+' + slice_hrn.split('.')[2]
+ lease = etree.SubElement(net, 'lease', slice_id = slice_id,\
+ start_time = str(start_time), duration = str(duration))
+ if res_type == 'node':
+ res = etree.SubElement(lease, res_type,\
+ component_id = new_resource[resource]['component_id'])
+ elif res_type == 'channel':
+ res = etree.SubElement(lease, res_type,\
+ channel_num = new_resource[resource]['channel_num'])
+ new_xml = etree.tostring(RSpec, xml_declaration=True)
+ return new_xml
+
+ def verify_reservation_xml(self, xml, slice_hrn, new_resource, start_time, duration, aggregate):
+ slice_id = "urn:publicid:IDN+" + slice_hrn.split('.')[0] + ':' + slice_hrn.split('.')[1]\
+ + '+slice+' + slice_hrn.split('.')[2]
+ rdata, ldata, stags = self.resources_from_xml(xml, sliver = True, resources = True)
+ res_name = new_resource.keys()[0]
+ if res_name in rdata.keys():
+ if start_time and duration:
+ if ldata[(start_time, duration)]:
+ nodes = ldata[(start_time, duration)]['nodes']
+ sliceid = ldata[(start_time, duration)]['slice_id']
+ if res_name in nodes and sliceid == slice_id:
+ return True
+ else: return False
+ else: return False
+ else: return True
+ else: return False
+
+ def release_reservation_xml(self, xml, slice_hrn, resource, start_time, duration, aggregate):
+ RSpec = etree.fromstring(xml)
+ network = RSpec.findall('.//network')
+ for net in network:
+ aggr = net.get('name')
+ if aggr == aggregate:
+ new_xml = self._delete_tag(RSpec, net, slice_hrn, resource, start_time, duration)
+ return new_xml
+
+ def _delete_tag(self, RSpec, net, slice_hrn, resource, start_time, duration):
+ resource_name = resource.keys()[0]
+ res_type = resource[resource_name]['resource_type']
+ if res_type == 'node':
+ node_tree = net.iterfind('node')
+ for node in list(node_tree):
+ if isinstance(node.tag, basestring):
+ data_node = dict(node.attrib)
+ if data_node['component_name'] == resource_name:
+ net.remove(node)
+ elif res_type == 'channel':
+ spectrum = net.find('spectrum')
+ for channel in list(spectrum):
+ if isinstance(channel.tag, basestring):
+ data_channel = dict(channel.attrib)
+ if data_channel['channel_num'] == resource_name:
+ spectrum.remove(channel)
+ if start_time is not None and duration is not None:
+ slice_id = "urn:publicid:IDN+" + slice_hrn.split('.')[0] + ':' + slice_hrn.split('.')[1]\
+ + '+slice+' + slice_hrn.split('.')[2]
+ leases = net.iterfind('lease')
+ for lease in list(leases):
+ if isinstance(lease.tag, basestring):
+ (st, duration) = lease.attrib['start_time'], lease.attrib['duration']
+ sliceid = lease.attrib['slice_id']
+ if st == str(start_time) and duration == str(duration) and sliceid == slice_id:
+ for l in list(lease):
+ if l.tag == 'node' and res_type == 'node':
+ if l.attrib['component_id'].split('+').pop() == resource_name:
+ lease.remove(l)
+ elif l.tag == 'channel' and res_type == 'channel':
+ if l.attrib['channel_num'] == resource_name:
+ lease.remove(l)
+ new_xml = etree.tostring(RSpec, xml_declaration=True)
+ return new_xml
+
+