From: Tony Mack Date: Tue, 27 Jul 2010 22:39:50 +0000 (+0000) Subject: fig bug in get_ticket(), ticket now has merged rspec X-Git-Tag: sfa-1.0-0~132 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=2a3f75a249b97f738bb1bfb3d702a2fb0ee5a21f;p=sfa.git fig bug in get_ticket(), ticket now has merged rspec --- diff --git a/sfa/managers/slice_manager_pl.py b/sfa/managers/slice_manager_pl.py index efbe3b29..72227d88 100644 --- a/sfa/managers/slice_manager_pl.py +++ b/sfa/managers/slice_manager_pl.py @@ -9,7 +9,7 @@ from copy import deepcopy from lxml import etree from StringIO import StringIO from types import StringTypes - +from sfa.util.rspec import merge_rspecs from sfa.util.namespace import * from sfa.util.rspec import * from sfa.util.specdict import * @@ -19,6 +19,7 @@ from sfa.util.policy import Policy from sfa.util.prefixTree import prefixTree from sfa.util.sfaticket import * from sfa.util.threadmanager import ThreadManager +import sfa.util.xmlrpcprotocol as xmlrpcprotocol from sfa.util.debug import log import sfa.plc.peers as peers @@ -66,86 +67,67 @@ def create_slice(api, xrn, rspec, origin_hrn=None): def get_ticket(api, xrn, rspec, origin_hrn=None): slice_hrn, type = urn_to_hrn(xrn) # get the netspecs contained within the clients rspec - client_rspec = RSpec(xml=rspec) - netspecs = client_rspec.getDictsByTagName('NetSpec') + aggregate_rspecs = {} + tree= etree.parse(StringIO(rspec)) + elements = tree.findall('./network') + for element in elements: + aggregate_hrn = element.values()[0] + aggregate_rspecs[aggregate_hrn] = rspec + + # get a ticket from each aggregate + credential = api.getCredential() + threads = ThreadManager() + for aggregate, aggregate_rspec in aggregate_rspecs.items(): + server = None + if aggregate in api.aggregates: + server = api.aggregates[aggregate] + else: + net_urn = hrn_to_urn(aggregate, 'authority') + # we may have a peer that knows about this aggregate + for agg in api.aggregates: + agg_info = api.aggregates[agg].get_aggregates(credential, net_urn) + if agg_info: + # send the request to this address + url = 'http://%s:%s' % (agg_info['addr'], agg_info['port']) + server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file) + break + if server is None: + continue + threads.run(server.get_ticket, credential, xrn, aggregate_rspec, origin_hrn) + results = threads.get_results() - # create an rspec for each individual rspec - rspecs = {} - temp_rspec = RSpec() - for netspec in netspecs: - net_hrn = netspec['name'] - resources = {'start_time': 0, 'end_time': 0 , - 'network': {'NetSpec' : netspec}} - resourceDict = {'RSpec': resources} - temp_rspec.parseDict(resourceDict) - rspecs[net_hrn] = temp_rspec.toxml() + # gather information from each ticket + rspecs = [] + initscripts = [] + slivers = [] + object_gid = None + for result in results: + agg_ticket = SfaTicket(string=result) + attrs = agg_ticket.get_attributes() + if not object_gid: + object_gid = agg_ticket.get_gid_object() + print object_gid + rspecs.append(agg_ticket.get_rspec()) + initscripts.extend(attrs.get('initscripts', [])) + slivers.extend(attrs.get('slivers', [])) - # send the rspec to the appropiate aggregate/sm - aggregates = api.aggregates - credential = api.getCredential() - tickets = {} - for net_hrn in rspecs: - net_urn = urn_to_hrn(net_hrn) - try: - # if we are directly connected to the aggregate then we can just - # send them the request. if not, then we may be connected to an sm - # thats connected to the aggregate - if net_hrn in aggregates: - ticket = aggregates[net_hrn].get_ticket(credential, xrn, \ - rspecs[net_hrn], origin_hrn) - tickets[net_hrn] = ticket - else: - # lets forward this rspec to a sm that knows about the network - for agg in aggregates: - network_found = aggregates[agg].get_aggregates(credential, net_urn) - if network_found: - ticket = aggregates[aggregate].get_ticket(credential, \ - slice_hrn, rspecs[net_hrn], origin_hrn) - tickets[aggregate] = ticket - except: - print >> log, "Error getting ticket for %(slice_hrn)s at aggregate %(net_hrn)s" % \ - locals() - - # create a new ticket - new_ticket = SfaTicket(subject = slice_hrn) - new_ticket.set_gid_caller(api.auth.client_gid) - new_ticket.set_issuer(key=api.key, subject=api.hrn) - - tmp_rspec = RSpec() - networks = [] - valid_data = { - 'timestamp': int(time.time()), - 'initscripts': [], - 'slivers': [] - } - # merge data from aggregate ticket into new ticket - for agg_ticket in tickets.values(): - # get data from this ticket - agg_ticket = SfaTicket(string=agg_ticket) - attributes = agg_ticket.get_attributes() - if attributes.get('initscripts', []) != None: - valid_data['initscripts'].extend(attributes.get('initscripts', [])) - if attributes.get('slivers', []) != None: - valid_data['slivers'].extend(attributes.get('slivers', [])) - - # set the object gid - object_gid = agg_ticket.get_gid_object() - new_ticket.set_gid_object(object_gid) - new_ticket.set_pubkey(object_gid.get_pubkey()) + # merge info + attributes = {'initscripts': initscripts, + 'slivers': slivers} + merged_rspec = merge_rspecs(rspecs) - # build the rspec - tmp_rspec.parseString(agg_ticket.get_rspec()) - networks.extend([{'NetSpec': tmp_rspec.getDictsByTagName('NetSpec')}]) - + # create a new ticket + ticket = SfaTicket(subject = slice_hrn) + ticket.set_gid_caller(api.auth.client_gid) + ticket.set_issuer(key=api.key, subject=api.hrn) + ticket.set_gid_object(object_gid) + ticket.set_pubkey(object_gid.get_pubkey()) #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn)) - new_ticket.set_attributes(valid_data) - resources = {'networks': networks, 'start_time': 0, 'duration': 0} - resourceDict = {'RSpec': resources} - tmp_rspec.parseDict(resourceDict) - new_ticket.set_rspec(tmp_rspec.toxml()) - new_ticket.encode() - new_ticket.sign() - return new_ticket.save_to_string(save_parents=True) + ticket.set_attributes(attributes) + ticket.set_rspec(merged_rspec) + ticket.encode() + ticket.sign() + return ticket.save_to_string(save_parents=True) def start_slice(api, xrn): credential = api.getCredential() @@ -153,6 +135,7 @@ def start_slice(api, xrn): for aggregate in api.aggregates: server = api.aggregates[aggregate] threads.run(server.stop_slice, credential, xrn) + threads.get_results() return 1 def stop_slice(api, xrn): @@ -161,6 +144,7 @@ def stop_slice(api, xrn): for aggregate in api.aggregates: server = api.aggregates[aggregate] threads.run(server.stop_slice, credential, xrn) + threads.get_results() return 1 def reset_slice(api, xrn):