fixed indentation
[sfa.git] / sfa / managers / aggregate_manager_max.py
1 from sfa.util.rspec import RSpec
2 import sys
3 import pdb
4 from sfa.util.namespace import *
5 from sfa.util.rspec import *
6 from sfa.util.specdict import *
7 from sfa.util.faults import *
8 from sfa.util.storage import *
9 from sfa.util.policy import Policy
10 from sfa.util.debug import log
11 from sfa.server.aggregate import Aggregates
12 from sfa.util.xrn import urn_to_hrn, hrn_to_urn, get_authority
13 from sfa.util.plxrn import hrn_to_pl_slicename
14 from sfa.util.plxrn import hrn_to_pl_slicename
15 from sfa.server.registry import Registries
16 from sfa.util.rspec import RSpec
17 from sfa.util.sfalogging import sfa_logger
18 from sfa.util.faults import *
19
20 import xml.dom.minidom
21
22 SFA_MAX_CONF_FILE = '/etc/sfa/max_allocations'
23 SFA_MAX_DEFAULT_RSPEC = '/etc/sfa/max_physical.xml'
24 SFA_MAX_CANNED_RSPEC = '/etc/sfa/max_physical_canned.xml'
25
26 topology = {}
27
28 class SfaOutOfResource(SfaFault):
29     def __init__(self, interface):
30         faultString = "Interface " + interface + " not available"
31         SfaFault.__init__(self, 100, faultString, '')
32
33 class SfaNoPairRSpec(SfaFault):
34     def __init__(self, interface, interface2):
35         faultString = "Interface " + interface + " should be paired with " + interface2
36         SfaFault.__init__(self, 100, faultString, '')
37
38 # Returns a mapping from interfaces to the nodes they lie on and their peer interfaces
39 # i -> node,i_peer
40
41 def get_interface_map():
42     r = RSpec()
43     r.parseFile(SFA_MAX_DEFAULT_RSPEC)
44     rspec = r.toDict()
45     capacity = rspec['rspec']['capacity']
46     netspec = capacity[0]['netspec'][0]
47     linkdefs = {}
48     for n in netspec['nodespec']:
49         ifspecs = n['ifspec']
50         nodename = n['node']
51         for i in ifspecs:
52             ifname = i['name']
53             linkid = i['linkid']
54
55             if (linkdefs.has_key(linkid)):
56                 linkdefs[linkid].extend([(nodename,ifname)])
57             else:
58                 linkdefs[linkid]=[(nodename,ifname)]
59     
60     # topology maps interface x interface -> link,node1,node2
61     topology={}
62
63     for k in linkdefs.keys():
64         (n1,i1) = linkdefs[k][0]
65         (n2,i2) = linkdefs[k][1]
66
67         topology[i1] = (n1, i2)
68         topology[i2] = (n2, i1)
69         
70
71     return topology    
72
73     
74 def allocations_to_rspec(allocations):
75     rspec = xml.dom.minidom.parse(SFA_MAX_DEFAULT_RSPEC)
76     req = rspec.firstChild.appendChild(rspec.createElement("request"))
77     for (iname,ip) in allocations:
78         ifspec = req.appendChild(rspec.createElement("ifspec"))
79         ifspec.setAttribute("name","tns:"+iname)
80         ifspec.setAttribute("ip",ip)
81
82     return rspec.toxml()
83         
84     
85 def if_endpoints(ifs):
86     nodes=[]
87     for l in ifs:
88         nodes.extend(topology[l][0])
89     return nodes
90
91 def lock_state_file():
92     # Noop for demo
93     return True
94
95 def unlock_state_file():
96     return True
97     # Noop for demo
98
99 def read_alloc_dict():
100     alloc_dict={}
101     rows = open(SFA_MAX_CONF_FILE).read().split('\n')
102     for r in rows:
103         columns = r.split(' ')
104         if (len(columns)==2):
105             hrn = columns[0]
106             allocs = columns[1].split(',')
107             ipallocs = map(lambda alloc:alloc.split('/'), allocs)
108             alloc_dict[hrn]=ipallocs
109     return alloc_dict
110
111 def commit_alloc_dict(d):
112     f = open(SFA_MAX_CONF_FILE, 'w')
113     for hrn in d.keys():
114         columns = d[hrn]
115         ipcolumns = map(lambda x:"/".join(x), columns)
116         row = hrn+' '+','.join(ipcolumns)+'\n'
117         f.write(row)
118     f.close()
119
120 def collapse_alloc_dict(d):
121     ret = []
122     for k in d.keys():
123         ret.extend(d[k])
124 from sfa.util.config import Config
125 from sfa.managers.aggregate_manager_pl import GetVersion
126 import os
127 import time
128
129 RSPEC_TMP_FILE_PREFIX = "/tmp/max_rspec"
130
131 # execute shell command and return both exit code and text output
132 def shell_execute(cmd, timeout):
133     pipe = os.popen('{ ' + cmd + '; } 2>&1', 'r')
134     pipe = os.popen(cmd + ' 2>&1', 'r')
135     text = ''
136     while timeout:
137         line = pipe.read()
138         text += line
139         time.sleep(1)
140         timeout = timeout-1
141     code = pipe.close()
142     if code is None: code = 0
143     if text[-1:] == '\n': text = text[:-1]
144     return code, text
145
146 """
147  call AM API client with command like in the following example:
148  cd aggregate_client; java -classpath AggregateWS-client-api.jar:lib/* \
149       net.geni.aggregate.client.examples.CreateSliceNetworkClient \
150       ./repo https://geni:8443/axis2/services/AggregateGENI \
151       ... params ...
152 """
153
154 def call_am_apiclient(client_app, params, timeout):
155     (client_path, am_url) = Config().get_max_aggrMgr_info()
156     sys_cmd = "cd " + client_path + "; java -classpath AggregateWS-client-api.jar:lib/* net.geni.aggregate.client.examples." + client_app + " ./repo " + am_url + " " + ' '.join(params)
157     ret = shell_execute(sys_cmd, timeout)
158     sfa_logger().debug("shell_execute cmd: %s returns %s" % (sys_cmd, ret))
159     return ret
160
161
162 def alloc_links(api, hrn, links_to_add, links_to_drop):
163     slicename=hrn_to_pl_slicename(hrn)
164     for (iface,ip) in links_to_add:
165         node = topology[iface][0][0]
166         try:
167             api.plshell.AddSliceTag(api.plauth, slicename, "ip_addresses", ip, node)
168             api.plshell.AddSliceTag(api.plauth, slicename, "vsys", "getvlan", node)
169         except Exception: 
170             # Probably a duplicate tag. XXX July 21
171             pass
172 # save request RSpec xml content to a tmp file
173 def save_rspec_to_file(rspec):
174     path = RSPEC_TMP_FILE_PREFIX + "_" + time.strftime('%Y%m%dT%H:%M:%S', time.gmtime(time.time())) +".xml"
175     file = open(path, "w")
176     file.write(rspec)
177     file.close()
178     return path
179
180 # get stripped down slice id/name plc:maxpl:xi_slice1 --> xi_slice1
181 def get_short_slice_id(cred, hrn):
182     if hrn == None:
183         return None
184     slice_id = hrn[hrn.rfind('+')+1:]
185     if slice_id == None:
186         slice_id = hrn[hrn.rfind(':')+1:]
187     if slice_id == None:
188        return hrn
189        pass
190     return str(slice_id)
191
192 # extract xml 
193 def get_xml_by_tag(text, tag):
194     indx1 = text.find('<'+tag)
195     indx2 = text.find('/'+tag+'>')
196     xml = None
197     if indx1!=-1 and indx2>indx1:
198         xml = text[indx1:indx2+len(tag)+2]
199     return xml
200
201 def create_slice(api, xrn, cred, rspec, users):
202     indx1 = rspec.find("<RSpec")
203     indx2 = rspec.find("</RSpec>")
204     if indx1 > -1 and indx2 > indx1:
205         rspec = rspec[indx1+len("<RSpec type=\"SFA\">"):indx2-1]
206     rspec_path = save_rspec_to_file(rspec)
207     (ret, output) = call_am_apiclient("CreateSliceNetworkClient", [rspec_path,], 3)
208     # parse output ?
209     rspec = "<RSpec type=\"SFA\"> Done! </RSpec>"
210     return True
211
212 def alloc_nodes(api,hrn, requested_ifs):
213     requested_nodes = if_endpoints(requested_ifs)
214     create_slice_max_aggregate(api, hrn, requested_nodes)
215
216 # Taken from slices.py
217
218 def create_slice_max_aggregate(api, hrn, nodes):
219     # Get the slice record 
220     global topology
221     topology = get_interface_map()
222     slice = {}
223     registries = Registries(api)
224     registry = registries[api.hrn]
225     credential = api.getCredential()
226     records = registry.resolve(credential, hrn)
227     for record in records:
228         if record.get_type() in ['slice']:
229             slice = record.as_dict()
230     if not slice:
231         raise RecordNotFound(hrn)   
232
233     # Make sure slice exists at plc, if it doesnt add it
234     slicename = hrn_to_pl_slicename(hrn)
235     slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids'])
236     if not slices:
237         parts = slicename.split("_")
238         login_base = parts[0]
239         # if site doesnt exist add it
240         sites = api.plshell.GetSites(api.plauth, [login_base])
241         if not sites:
242             authority = get_authority(hrn)
243             site_records = registry.resolve(credential, authority)
244             site_record = {}
245             if not site_records:
246                 raise RecordNotFound(authority)
247             site_record = site_records[0]
248             site = site_record.as_dict()
249                 
250             # add the site
251             site.pop('site_id')
252             site_id = api.plshell.AddSite(api.plauth, site)
253         else:
254             site = sites[0]
255             
256         slice_fields = {}
257         slice_keys = ['name', 'url', 'description']
258         for key in slice_keys:
259             if key in slice and slice[key]:
260                 slice_fields[key] = slice[key]  
261         api.plshell.AddSlice(api.plauth, slice_fields)
262         slice = slice_fields
263         slice['node_ids'] = 0
264     else:
265         slice = slices[0]    
266
267     # get the list of valid slice users from the registry and make 
268     # they are added to the slice 
269     researchers = record.get('researcher', [])
270     for researcher in researchers:
271         person_record = {}
272         person_records = registry.resolve(credential, researcher)
273         for record in person_records:
274             if record.get_type() in ['user']:
275                 person_record = record
276         if not person_record:
277             pass
278         person_dict = person_record.as_dict()
279         persons = api.plshell.GetPersons(api.plauth, [person_dict['email']],
280                                          ['person_id', 'key_ids'])
281
282         # Create the person record 
283         if not persons:
284             person_id=api.plshell.AddPerson(api.plauth, person_dict)
285
286             # The line below enables the user account on the remote aggregate
287             # soon after it is created.
288             # without this the user key is not transfered to the slice
289             # (as GetSlivers returns key of only enabled users),
290             # which prevents the user from login to the slice.
291             # We may do additional checks before enabling the user.
292
293             api.plshell.UpdatePerson(api.plauth, person_id, {'enabled' : True})
294             key_ids = []
295         else:
296             key_ids = persons[0]['key_ids']
297
298         api.plshell.AddPersonToSlice(api.plauth, person_dict['email'],
299                                      slicename)        
300
301         # Get this users local keys
302         keylist = api.plshell.GetKeys(api.plauth, key_ids, ['key'])
303         keys = [key['key'] for key in keylist]
304
305         # add keys that arent already there 
306         for personkey in person_dict['keys']:
307             if personkey not in keys:
308                 key = {'key_type': 'ssh', 'key': personkey}
309                 api.plshell.AddPersonKey(api.plauth, person_dict['email'], key)
310
311     # find out where this slice is currently running
312     nodelist = api.plshell.GetNodes(api.plauth, slice['node_ids'],
313                                     ['hostname'])
314     hostnames = [node['hostname'] for node in nodelist]
315
316     # remove nodes not in rspec
317     deleted_nodes = list(set(hostnames).difference(nodes))
318     # add nodes from rspec
319     added_nodes = list(set(nodes).difference(hostnames))
320
321     api.plshell.AddSliceToNodes(api.plauth, slicename, added_nodes) 
322     api.plshell.DeleteSliceFromNodes(api.plauth, slicename, deleted_nodes)
323
324 def delete_slice(api, xrn, cred):
325     slice_id = get_short_slice_id(cred, xrn)
326     (ret, output) = call_am_apiclient("DeleteSliceNetworkClient", [slice_id,], 3)
327     # parse output ?
328     return 1
329
330
331 def get_rspec(api, creds, options):
332     # get slice's hrn from options
333     xrn = options.get('geni_slice_urn', None)
334     hrn, type = urn_to_hrn(xrn)
335     # Eg. config line:
336     # plc.princeton.sapan vlan23,vlan45
337
338     allocations = read_alloc_dict()
339     if (hrn and allocations.has_key(hrn)):
340             ret_rspec = allocations_to_rspec(allocations[hrn])
341 def get_rspec(api, cred, options):
342     #geni_slice_urn: urn:publicid:IDN+plc:maxpl+slice+xi_rspec_test1
343     urn = options.get('geni_slice_urn')
344     slice_id = get_short_slice_id(cred, urn)
345     if slice_id == None:
346         (ret, output) = call_am_apiclient("GetResourceTopology", ['all', '\"\"'], 5)
347     else:
348         ret_rspec = open(SFA_MAX_CANNED_RSPEC).read()
349
350     return (ret_rspec)
351
352
353 def create_slice(api, xrn, creds, rspec_xml, users):
354     global topology
355     hrn = urn_to_hrn(xrn)[0]
356     topology = get_interface_map()
357
358     # Check if everything in rspec is either allocated by hrn
359     # or not allocated at all.
360     r = RSpec()
361     r.parseString(rspec_xml)
362     rspec = r.toDict()
363
364     lock_state_file()
365
366     allocations = read_alloc_dict()
367     requested_allocations = rspec_to_allocations (rspec)
368     current_allocations = collapse_alloc_dict(allocations)
369     try:
370         current_hrn_allocations=allocations[hrn]
371     except KeyError:
372         current_hrn_allocations=[]
373
374     # Check request against current allocations
375     requested_interfaces = map(lambda(elt):elt[0], requested_allocations)
376     current_interfaces = map(lambda(elt):elt[0], current_allocations)
377     current_hrn_interfaces = map(lambda(elt):elt[0], current_hrn_allocations)
378
379     for a in requested_interfaces:
380         if (a not in current_hrn_interfaces and a in current_interfaces):
381             raise SfaOutOfResource(a)
382         if (topology[a][1] not in requested_interfaces):
383             raise SfaNoPairRSpec(a,topology[a][1])
384     # Request OK
385
386     # Allocations to delete
387     allocations_to_delete = []
388     for a in current_hrn_allocations:
389         if (a not in requested_allocations):
390             allocations_to_delete.extend([a])
391
392     # Ok, let's do our thing
393     alloc_nodes(api, hrn, requested_interfaces)
394     alloc_links(api, hrn, requested_allocations, allocations_to_delete)
395     allocations[hrn] = requested_allocations
396     commit_alloc_dict(allocations)
397
398     unlock_state_file()
399
400     return True
401
402 def rspec_to_allocations(rspec):
403     ifs = []
404     try:
405         ifspecs = rspec['rspec']['request'][0]['ifspec']
406         for l in ifspecs:
407             ifs.extend([(l['name'].replace('tns:',''),l['ip'])])
408     except KeyError:
409         # Bad RSpec
410         pass
411     return ifs
412     # xxx - fixme
413     (ret, output) = call_am_apiclient("GetResourceTopology", ['all', slice_id,], 5)
414     # parse output into rspec XML
415     if output.find("No resouce found") > 0:
416         rspec = "<RSpec type=\"SFA\"> <Fault>No resource found</Fault> </RSpec>"
417     else:
418         comp_rspec = get_xml_by_tag(output, 'computeResource')
419         sfa_logger().debug("#### computeResource %s" % comp_rspec)
420         topo_rspec = get_xml_by_tag(output, 'topology')
421         sfa_logger().debug("#### topology %s" % topo_rspec)
422         rspec = "<RSpec type=\"SFA\"> <network name=\"" + Config().get_interface_hrn() + "\">";
423         if comp_rspec != None:
424             rspec = rspec + get_xml_by_tag(output, 'computeResource')
425         if topo_rspec != None:
426             rspec = rspec + get_xml_by_tag(output, 'topology')
427         rspec = rspec + "</network> </RSpec>"
428
429     return (rspec)
430
431 def start_slice(api, xrn, cred):
432     # service not supported
433     return None
434
435 def stop_slice(api, xrn, cred):
436     # service not supported
437     return None
438
439 def reset_slices(api, xrn):
440     # service not supported
441     return None
442
443 """
444 Returns the request context required by sfatables. At some point, this mechanism should be changed
445 to refer to "contexts", which is the information that sfatables is requesting. But for now, we just
446 return the basic information needed in a dict.
447 """
448 def fetch_context(slice_hrn, user_hrn, contexts):
449     base_context = {'sfa':{'user':{'hrn':user_hrn}}}
450     return base_context
451
452 def main():
453     t = get_interface_map()
454     api = SfaAPI()
455     r = RSpec()
456     rspec_xml = open(sys.argv[1]).read()
457     #get_rspec(None,'foo')
458     create_slice(None, "plc.princeton.sap0", rspec_xml)
459     
460     create_slice(api, "plc.maxpl.test000", None, rspec_xml, None)
461
462 if __name__ == "__main__":
463     main()