updates for MAX aggregate
[sfa.git] / sfa / managers / aggregate_manager_max.py
1 -from sfa.util.rspec import RSpec
2 -import sys
3 -import pdb
4 -from sfa.util.namespace import *
5 -from sfa.util.rspec import *
6 -from sfa.util.specdict import *
7 -from sfa.util.faults import *
8 -from sfa.util.storage import *
9 -from sfa.util.policy import Policy
10 -from sfa.util.debug import log
11 -from sfa.server.aggregate import Aggregates
12 +from sfa.util.xrn import urn_to_hrn, hrn_to_urn, get_authority
13 +from sfa.util.plxrn import hrn_to_pl_slicename
14 +from sfa.util.plxrn import hrn_to_pl_slicename
15  from sfa.server.registry import Registries
16 +from sfa.util.rspec import RSpec
17 +from sfa.util.sfalogging import sfa_logger
18  from sfa.util.faults import *
19 -
20 -import xml.dom.minidom
21 -
22 -SFA_MAX_CONF_FILE = '/etc/sfa/max_allocations'
23 -SFA_MAX_DEFAULT_RSPEC = '/etc/sfa/max_physical.xml'
24 -SFA_MAX_CANNED_RSPEC = '/etc/sfa/max_physical_canned.xml'
25 -
26 -topology = {}
27 -
28 -class SfaOutOfResource(SfaFault):
29 -    def __init__(self, interface):
30 -        faultString = "Interface " + interface + " not available"
31 -        SfaFault.__init__(self, 100, faultString, '')
32 -
33 -class SfaNoPairRSpec(SfaFault):
34 -    def __init__(self, interface, interface2):
35 -        faultString = "Interface " + interface + " should be paired with " + interface2
36 -        SfaFault.__init__(self, 100, faultString, '')
37 -
38 -# Returns a mapping from interfaces to the nodes they lie on and their peer interfaces
39 -# i -> node,i_peer
40 -
41 -def get_interface_map():
42 -    r = RSpec()
43 -    r.parseFile(SFA_MAX_DEFAULT_RSPEC)
44 -    rspec = r.toDict()
45 -    capacity = rspec['rspec']['capacity']
46 -    netspec = capacity[0]['netspec'][0]
47 -    linkdefs = {}
48 -    for n in netspec['nodespec']:
49 -        ifspecs = n['ifspec']
50 -        nodename = n['node']
51 -        for i in ifspecs:
52 -            ifname = i['name']
53 -            linkid = i['linkid']
54 -
55 -            if (linkdefs.has_key(linkid)):
56 -                linkdefs[linkid].extend([(nodename,ifname)])
57 -            else:
58 -                linkdefs[linkid]=[(nodename,ifname)]
59 -    
60 -    # topology maps interface x interface -> link,node1,node2
61 -    topology={}
62 -
63 -    for k in linkdefs.keys():
64 -        (n1,i1) = linkdefs[k][0]
65 -        (n2,i2) = linkdefs[k][1]
66 -
67 -        topology[i1] = (n1, i2)
68 -        topology[i2] = (n2, i1)
69 -        
70 -
71 -    return topology    
72 -
73 -    
74 -def allocations_to_rspec(allocations):
75 -    rspec = xml.dom.minidom.parse(SFA_MAX_DEFAULT_RSPEC)
76 -    req = rspec.firstChild.appendChild(rspec.createElement("request"))
77 -    for (iname,ip) in allocations:
78 -        ifspec = req.appendChild(rspec.createElement("ifspec"))
79 -        ifspec.setAttribute("name","tns:"+iname)
80 -        ifspec.setAttribute("ip",ip)
81 -
82 -    return rspec.toxml()
83 -        
84 -    
85 -def if_endpoints(ifs):
86 -    nodes=[]
87 -    for l in ifs:
88 -        nodes.extend(topology[l][0])
89 -    return nodes
90 -
91 -def lock_state_file():
92 -    # Noop for demo
93 -    return True
94 -
95 -def unlock_state_file():
96 -    return True
97 -    # Noop for demo
98 -
99 -def read_alloc_dict():
100 -    alloc_dict={}
101 -    rows = open(SFA_MAX_CONF_FILE).read().split('\n')
102 -    for r in rows:
103 -        columns = r.split(' ')
104 -        if (len(columns)==2):
105 -            hrn = columns[0]
106 -            allocs = columns[1].split(',')
107 -            ipallocs = map(lambda alloc:alloc.split('/'), allocs)
108 -            alloc_dict[hrn]=ipallocs
109 -    return alloc_dict
110 -
111 -def commit_alloc_dict(d):
112 -    f = open(SFA_MAX_CONF_FILE, 'w')
113 -    for hrn in d.keys():
114 -        columns = d[hrn]
115 -        ipcolumns = map(lambda x:"/".join(x), columns)
116 -        row = hrn+' '+','.join(ipcolumns)+'\n'
117 -        f.write(row)
118 -    f.close()
119 -
120 -def collapse_alloc_dict(d):
121 -    ret = []
122 -    for k in d.keys():
123 -        ret.extend(d[k])
124 +from sfa.util.config import Config
125 +from sfa.managers.aggregate_manager_pl import GetVersion
126 +import os
127 +import time
128 +
129 +RSPEC_TMP_FILE_PREFIX = "/tmp/max_rspec"
130 +
131 +# execute shell command and return both exit code and text output
132 +def shell_execute(cmd, timeout):
133 +    pipe = os.popen('{ ' + cmd + '; } 2>&1', 'r')
134 +    pipe = os.popen(cmd + ' 2>&1', 'r')
135 +    text = ''
136 +    while timeout:
137 +        line = pipe.read()
138 +        text += line
139 +        time.sleep(1)
140 +        timeout = timeout-1
141 +    code = pipe.close()
142 +    if code is None: code = 0
143 +    if text[-1:] == '\n': text = text[:-1]
144 +    return code, text
145 +
146 +"""
147 + call AM API client with command like in the following example:
148 + cd aggregate_client; java -classpath AggregateWS-client-api.jar:lib/* \
149 +      net.geni.aggregate.client.examples.CreateSliceNetworkClient \
150 +      ./repo https://geni:8443/axis2/services/AggregateGENI \
151 +      ... params ...
152 +"""
153 +
154 +def call_am_apiclient(client_app, params, timeout):
155 +    (client_path, am_url) = Config().get_max_aggrMgr_info()
156 +    sys_cmd = "cd " + client_path + "; java -classpath AggregateWS-client-api.jar:lib/* net.geni.aggregate.client.examples." + client_app + " ./repo " + am_url + " " + ' '.join(params)
157 +    ret = shell_execute(sys_cmd, timeout)
158 +    sfa_logger().debug("shell_execute cmd: %s returns %s" % (sys_cmd, ret))
159      return ret
160  
161 -
162 -def alloc_links(api, hrn, links_to_add, links_to_drop):
163 -    slicename=hrn_to_pl_slicename(hrn)
164 -    for (iface,ip) in links_to_add:
165 -        node = topology[iface][0][0]
166 -        try:
167 -            api.plshell.AddSliceTag(api.plauth, slicename, "ip_addresses", ip, node)
168 -            api.plshell.AddSliceTag(api.plauth, slicename, "vsys", "getvlan", node)
169 -        except Exception: 
170 -            # Probably a duplicate tag. XXX July 21
171 -            pass
172 +# save request RSpec xml content to a tmp file
173 +def save_rspec_to_file(rspec):
174 +    path = RSPEC_TMP_FILE_PREFIX + "_" + time.strftime('%Y%m%dT%H:%M:%S', time.gmtime(time.time())) +".xml"
175 +    file = open(path, "w")
176 +    file.write(rspec)
177 +    file.close()
178 +    return path
179 +
180 +# get stripped down slice id/name plc:maxpl:xi_slice1 --> xi_slice1
181 +def get_short_slice_id(cred, hrn):
182 +    if hrn == None:
183 +        return None
184 +    slice_id = hrn[hrn.rfind('+')+1:]
185 +    if slice_id == None:
186 +        slice_id = hrn[hrn.rfind(':')+1:]
187 +    if slice_id == None:
188 +       return hrn
189 +       pass
190 +    return str(slice_id)
191 +
192 +# extract xml 
193 +def get_xml_by_tag(text, tag):
194 +    indx1 = text.find('<'+tag)
195 +    indx2 = text.find('/'+tag+'>')
196 +    xml = None
197 +    if indx1!=-1 and indx2>indx1:
198 +        xml = text[indx1:indx2+len(tag)+2]
199 +    return xml
200 +
201 +def create_slice(api, xrn, cred, rspec, users):
202 +    indx1 = rspec.find("<RSpec")
203 +    indx2 = rspec.find("</RSpec>")
204 +    if indx1 > -1 and indx2 > indx1:
205 +        rspec = rspec[indx1+len("<RSpec type=\"SFA\">"):indx2-1]
206 +    rspec_path = save_rspec_to_file(rspec)
207 +    (ret, output) = call_am_apiclient("CreateSliceNetworkClient", [rspec_path,], 3)
208 +    # parse output ?
209 +    rspec = "<RSpec type=\"SFA\"> Done! </RSpec>"
210      return True
211  
212 -def alloc_nodes(api,hrn, requested_ifs):
213 -    requested_nodes = if_endpoints(requested_ifs)
214 -    create_slice_max_aggregate(api, hrn, requested_nodes)
215 -
216 -# Taken from slices.py
217 -
218 -def create_slice_max_aggregate(api, hrn, nodes):
219 -    # Get the slice record 
220 -    global topology
221 -    topology = get_interface_map()
222 -    slice = {}
223 -    registries = Registries(api)
224 -    registry = registries[api.hrn]
225 -    credential = api.getCredential()
226 -    records = registry.resolve(credential, hrn)
227 -    for record in records:
228 -        if record.get_type() in ['slice']:
229 -            slice = record.as_dict()
230 -    if not slice:
231 -        raise RecordNotFound(hrn)   
232 -
233 -    # Make sure slice exists at plc, if it doesnt add it
234 -    slicename = hrn_to_pl_slicename(hrn)
235 -    slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids'])
236 -    if not slices:
237 -        parts = slicename.split("_")
238 -        login_base = parts[0]
239 -        # if site doesnt exist add it
240 -        sites = api.plshell.GetSites(api.plauth, [login_base])
241 -        if not sites:
242 -            authority = get_authority(hrn)
243 -            site_records = registry.resolve(credential, authority)
244 -            site_record = {}
245 -            if not site_records:
246 -                raise RecordNotFound(authority)
247 -            site_record = site_records[0]
248 -            site = site_record.as_dict()
249 -                
250 -            # add the site
251 -            site.pop('site_id')
252 -            site_id = api.plshell.AddSite(api.plauth, site)
253 -        else:
254 -            site = sites[0]
255 -            
256 -        slice_fields = {}
257 -        slice_keys = ['name', 'url', 'description']
258 -        for key in slice_keys:
259 -            if key in slice and slice[key]:
260 -                slice_fields[key] = slice[key]  
261 -        api.plshell.AddSlice(api.plauth, slice_fields)
262 -        slice = slice_fields
263 -        slice['node_ids'] = 0
264 -    else:
265 -        slice = slices[0]    
266 -
267 -    # get the list of valid slice users from the registry and make 
268 -    # they are added to the slice 
269 -    researchers = record.get('researcher', [])
270 -    for researcher in researchers:
271 -        person_record = {}
272 -        person_records = registry.resolve(credential, researcher)
273 -        for record in person_records:
274 -            if record.get_type() in ['user']:
275 -                person_record = record
276 -        if not person_record:
277 -            pass
278 -        person_dict = person_record.as_dict()
279 -        persons = api.plshell.GetPersons(api.plauth, [person_dict['email']],
280 -                                         ['person_id', 'key_ids'])
281 -
282 -        # Create the person record 
283 -        if not persons:
284 -            person_id=api.plshell.AddPerson(api.plauth, person_dict)
285 -
286 -            # The line below enables the user account on the remote aggregate
287 -            # soon after it is created.
288 -            # without this the user key is not transfered to the slice
289 -            # (as GetSlivers returns key of only enabled users),
290 -            # which prevents the user from login to the slice.
291 -            # We may do additional checks before enabling the user.
292 -
293 -            api.plshell.UpdatePerson(api.plauth, person_id, {'enabled' : True})
294 -            key_ids = []
295 -        else:
296 -            key_ids = persons[0]['key_ids']
297 -
298 -        api.plshell.AddPersonToSlice(api.plauth, person_dict['email'],
299 -                                     slicename)        
300 -
301 -        # Get this users local keys
302 -        keylist = api.plshell.GetKeys(api.plauth, key_ids, ['key'])
303 -        keys = [key['key'] for key in keylist]
304 -
305 -        # add keys that arent already there 
306 -        for personkey in person_dict['keys']:
307 -            if personkey not in keys:
308 -                key = {'key_type': 'ssh', 'key': personkey}
309 -                api.plshell.AddPersonKey(api.plauth, person_dict['email'], key)
310 -
311 -    # find out where this slice is currently running
312 -    nodelist = api.plshell.GetNodes(api.plauth, slice['node_ids'],
313 -                                    ['hostname'])
314 -    hostnames = [node['hostname'] for node in nodelist]
315 -
316 -    # remove nodes not in rspec
317 -    deleted_nodes = list(set(hostnames).difference(nodes))
318 -    # add nodes from rspec
319 -    added_nodes = list(set(nodes).difference(hostnames))
320 -
321 -    api.plshell.AddSliceToNodes(api.plauth, slicename, added_nodes) 
322 -    api.plshell.DeleteSliceFromNodes(api.plauth, slicename, deleted_nodes)
323 -
324 +def delete_slice(api, xrn, cred):
325 +    slice_id = get_short_slice_id(cred, xrn)
326 +    (ret, output) = call_am_apiclient("DeleteSliceNetworkClient", [slice_id,], 3)
327 +    # parse output ?
328      return 1
329  
330 -
331 -def get_rspec(api, creds, options):
332 -    # get slice's hrn from options
333 -    xrn = options.get('geni_slice_urn', None)
334 -    hrn, type = urn_to_hrn(xrn)
335 -    # Eg. config line:
336 -    # plc.princeton.sapan vlan23,vlan45
337 -
338 -    allocations = read_alloc_dict()
339 -    if (hrn and allocations.has_key(hrn)):
340 -            ret_rspec = allocations_to_rspec(allocations[hrn])
341 +def get_rspec(api, cred, options):
342 +    #geni_slice_urn: urn:publicid:IDN+plc:maxpl+slice+xi_rspec_test1
343 +    urn = options.get('geni_slice_urn')
344 +    slice_id = get_short_slice_id(cred, urn)
345 +    if slice_id == None:
346 +        (ret, output) = call_am_apiclient("GetResourceTopology", ['all', '\"\"'], 5)
347      else:
348 -        ret_rspec = open(SFA_MAX_CANNED_RSPEC).read()
349 -
350 -    return (ret_rspec)
351 -
352 -
353 -def create_slice(api, xrn, creds, rspec_xml, users):
354 -    global topology
355 -    hrn = urn_to_hrn(xrn)[0]
356 -    topology = get_interface_map()
357 -
358 -    # Check if everything in rspec is either allocated by hrn
359 -    # or not allocated at all.
360 -    r = RSpec()
361 -    r.parseString(rspec_xml)
362 -    rspec = r.toDict()
363 -
364 -    lock_state_file()
365 -
366 -    allocations = read_alloc_dict()
367 -    requested_allocations = rspec_to_allocations (rspec)
368 -    current_allocations = collapse_alloc_dict(allocations)
369 -    try:
370 -        current_hrn_allocations=allocations[hrn]
371 -    except KeyError:
372 -        current_hrn_allocations=[]
373 -
374 -    # Check request against current allocations
375 -    requested_interfaces = map(lambda(elt):elt[0], requested_allocations)
376 -    current_interfaces = map(lambda(elt):elt[0], current_allocations)
377 -    current_hrn_interfaces = map(lambda(elt):elt[0], current_hrn_allocations)
378 -
379 -    for a in requested_interfaces:
380 -        if (a not in current_hrn_interfaces and a in current_interfaces):
381 -            raise SfaOutOfResource(a)
382 -        if (topology[a][1] not in requested_interfaces):
383 -            raise SfaNoPairRSpec(a,topology[a][1])
384 -    # Request OK
385 -
386 -    # Allocations to delete
387 -    allocations_to_delete = []
388 -    for a in current_hrn_allocations:
389 -        if (a not in requested_allocations):
390 -            allocations_to_delete.extend([a])
391 -
392 -    # Ok, let's do our thing
393 -    alloc_nodes(api, hrn, requested_interfaces)
394 -    alloc_links(api, hrn, requested_allocations, allocations_to_delete)
395 -    allocations[hrn] = requested_allocations
396 -    commit_alloc_dict(allocations)
397 -
398 -    unlock_state_file()
399 -
400 -    return True
401 -
402 -def rspec_to_allocations(rspec):
403 -    ifs = []
404 -    try:
405 -        ifspecs = rspec['rspec']['request'][0]['ifspec']
406 -        for l in ifspecs:
407 -            ifs.extend([(l['name'].replace('tns:',''),l['ip'])])
408 -    except KeyError:
409 -        # Bad RSpec
410 -        pass
411 -    return ifs
412 +        (ret, output) = call_am_apiclient("GetResourceTopology", ['all', slice_id,], 5)
413 +    # parse output into rspec XML
414 +    if output.find("No resouce found") > 0:
415 +        rspec = "<RSpec type=\"SFA\"> <Fault>No resource found</Fault> </RSpec>"
416 +    else:
417 +        comp_rspec = get_xml_by_tag(output, 'computeResource')
418 +        sfa_logger().debug("#### computeResource %s" % comp_rspec)
419 +        topo_rspec = get_xml_by_tag(output, 'topology')
420 +        sfa_logger().debug("#### topology %s" % topo_rspec)
421 +        rspec = "<RSpec type=\"SFA\"> <network name=\"" + Config().get_interface_hrn() + "\">";
422 +        if comp_rspec != None:
423 +            rspec = rspec + get_xml_by_tag(output, 'computeResource')
424 +        if topo_rspec != None:
425 +            rspec = rspec + get_xml_by_tag(output, 'topology')
426 +        rspec = rspec + "</network> </RSpec>"
427 +
428 +    return (rspec)
429 +
430 +def start_slice(api, xrn, cred):
431 +    # service not supported
432 +    return None
433 +
434 +def stop_slice(api, xrn, cred):
435 +    # service not supported
436 +    return None
437 +
438 +def reset_slices(api, xrn):
439 +    # service not supported
440 +    return None
441 +
442 +"""
443 +Returns the request context required by sfatables. At some point, this mechanism should be changed
444 +to refer to "contexts", which is the information that sfatables is requesting. But for now, we just
445 +return the basic information needed in a dict.
446 +"""
447 +def fetch_context(slice_hrn, user_hrn, contexts):
448 +    base_context = {'sfa':{'user':{'hrn':user_hrn}}}
449 +    return base_context
450  
451  def main():
452 -    t = get_interface_map()
453 +    api = SfaAPI()
454      r = RSpec()
455      rspec_xml = open(sys.argv[1]).read()
456 -    #get_rspec(None,'foo')
457 -    create_slice(None, "plc.princeton.sap0", rspec_xml)
458 -    
459 +    create_slice(api, "plc.maxpl.test000", None, rspec_xml, None)
460 +
461  if __name__ == "__main__":
462      main()