setting sfa-0.9-16 tag
[sfa.git] / sfa / managers / aggregate_manager_max.py
1 #!/usr/bin/python
2
3 from sfa.util.rspec import RSpec
4 import sys
5 import pdb
6 from sfa.util.namespace import *
7 from sfa.util.rspec import *
8 from sfa.util.specdict import *
9 from sfa.util.faults import *
10 from sfa.util.storage import *
11 from sfa.util.policy import Policy
12 from sfa.util.debug import log
13 from sfa.server.aggregate import Aggregates
14 from sfa.server.registry import Registries
15 from sfa.util.faults import *
16
17 import xml.dom.minidom
18
19 SFA_MAX_CONF_FILE = '/etc/sfa/max_allocations'
20 SFA_MAX_DEFAULT_RSPEC = '/etc/sfa/max_physical.xml'
21 SFA_MAX_CANNED_RSPEC = '/etc/sfa/max_physical_canned.xml'
22
23 topology = {}
24
25 class SfaOutOfResource(SfaFault):
26     def __init__(self, interface):
27         faultString = "Interface " + interface + " not available"
28         SfaFault.__init__(self, 100, faultString, '')
29
30 class SfaNoPairRSpec(SfaFault):
31     def __init__(self, interface, interface2):
32         faultString = "Interface " + interface + " should be paired with " + interface2
33         SfaFault.__init__(self, 100, faultString, '')
34
35 # Returns a mapping from interfaces to the nodes they lie on and their peer interfaces
36 # i -> node,i_peer
37
38 def get_interface_map():
39     r = RSpec()
40     r.parseFile(SFA_MAX_DEFAULT_RSPEC)
41     rspec = r.toDict()
42     capacity = rspec['rspec']['capacity']
43     netspec = capacity[0]['netspec'][0]
44     linkdefs = {}
45     for n in netspec['nodespec']:
46         ifspecs = n['ifspec']
47         nodename = n['node']
48         for i in ifspecs:
49             ifname = i['name']
50             linkid = i['linkid']
51
52             if (linkdefs.has_key(linkid)):
53                 linkdefs[linkid].extend([(nodename,ifname)])
54             else:
55                 linkdefs[linkid]=[(nodename,ifname)]
56     
57     # topology maps interface x interface -> link,node1,node2
58     topology={}
59
60     for k in linkdefs.keys():
61         (n1,i1) = linkdefs[k][0]
62         (n2,i2) = linkdefs[k][1]
63
64         topology[i1] = (n1, i2)
65         topology[i2] = (n2, i1)
66         
67
68     return topology    
69
70     
71 def allocations_to_rspec(allocations):
72     rspec = xml.dom.minidom.parse(SFA_MAX_DEFAULT_RSPEC)
73     req = rspec.firstChild.appendChild(rspec.createElement("request"))
74     for (iname,ip) in allocations:
75         ifspec = req.appendChild(rspec.createElement("ifspec"))
76         ifspec.setAttribute("name","tns:"+iname)
77         ifspec.setAttribute("ip",ip)
78
79     return rspec.toxml()
80         
81     
82 def if_endpoints(ifs):
83     nodes=[]
84     for l in ifs:
85         nodes.extend(topology[l][0])
86     return nodes
87
88 def lock_state_file():
89     # Noop for demo
90     return True
91
92 def unlock_state_file():
93     return True
94     # Noop for demo
95
96 def read_alloc_dict():
97     alloc_dict={}
98     rows = open(SFA_MAX_CONF_FILE).read().split('\n')
99     for r in rows:
100         columns = r.split(' ')
101         if (len(columns)==2):
102             hrn = columns[0]
103             allocs = columns[1].split(',')
104             ipallocs = map(lambda alloc:alloc.split('/'), allocs)
105             alloc_dict[hrn]=ipallocs
106     return alloc_dict
107
108 def commit_alloc_dict(d):
109     f = open(SFA_MAX_CONF_FILE, 'w')
110     for hrn in d.keys():
111         columns = d[hrn]
112         ipcolumns = map(lambda x:"/".join(x), columns)
113         row = hrn+' '+','.join(ipcolumns)+'\n'
114         f.write(row)
115     f.close()
116
117 def collapse_alloc_dict(d):
118     ret = []
119     for k in d.keys():
120         ret.extend(d[k])
121     return ret
122
123
124 def alloc_links(api, hrn, links_to_add, links_to_drop):
125     slicename=hrn_to_pl_slicename(hrn)
126     for (iface,ip) in links_to_add:
127         node = topology[iface][0][0]
128         try:
129             api.plshell.AddSliceTag(api.plauth, slicename, "ip_addresses", ip, node)
130             api.plshell.AddSliceTag(api.plauth, slicename, "vsys", "getvlan", node)
131         except Exception: 
132             # Probably a duplicate tag. XXX July 21
133             pass
134     return True
135
136 def alloc_nodes(api,hrn, requested_ifs):
137     requested_nodes = if_endpoints(requested_ifs)
138     create_slice_max_aggregate(api, hrn, requested_nodes)
139
140 # Taken from slices.py
141
142 def create_slice_max_aggregate(api, hrn, nodes):
143     # Get the slice record 
144     global topology
145     topology = get_interface_map()
146     slice = {}
147     registries = Registries(api)
148     registry = registries[api.hrn]
149     credential = api.getCredential()
150     records = registry.resolve(credential, hrn)
151     for record in records:
152         if record.get_type() in ['slice']:
153             slice = record.as_dict()
154     if not slice:
155         raise RecordNotFound(hrn)   
156
157     # Make sure slice exists at plc, if it doesnt add it
158     slicename = hrn_to_pl_slicename(hrn)
159     slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids'])
160     if not slices:
161         parts = slicename.split("_")
162         login_base = parts[0]
163         # if site doesnt exist add it
164         sites = api.plshell.GetSites(api.plauth, [login_base])
165         if not sites:
166             authority = get_authority(hrn)
167             site_records = registry.resolve(credential, authority)
168             site_record = {}
169             if not site_records:
170                 raise RecordNotFound(authority)
171             site_record = site_records[0]
172             site = site_record.as_dict()
173                 
174             # add the site
175             site.pop('site_id')
176             site_id = api.plshell.AddSite(api.plauth, site)
177         else:
178             site = sites[0]
179             
180         slice_fields = {}
181         slice_keys = ['name', 'url', 'description']
182         for key in slice_keys:
183             if key in slice and slice[key]:
184                 slice_fields[key] = slice[key]  
185         api.plshell.AddSlice(api.plauth, slice_fields)
186         slice = slice_fields
187         slice['node_ids'] = 0
188     else:
189         slice = slices[0]    
190
191     # get the list of valid slice users from the registry and make 
192     # they are added to the slice 
193     researchers = record.get('researcher', [])
194     for researcher in researchers:
195         person_record = {}
196         person_records = registry.resolve(credential, researcher)
197         for record in person_records:
198             if record.get_type() in ['user']:
199                 person_record = record
200         if not person_record:
201             pass
202         person_dict = person_record.as_dict()
203         persons = api.plshell.GetPersons(api.plauth, [person_dict['email']],
204                                          ['person_id', 'key_ids'])
205
206         # Create the person record 
207         if not persons:
208             person_id=api.plshell.AddPerson(api.plauth, person_dict)
209
210             # The line below enables the user account on the remote aggregate
211             # soon after it is created.
212             # without this the user key is not transfered to the slice
213             # (as GetSlivers returns key of only enabled users),
214             # which prevents the user from login to the slice.
215             # We may do additional checks before enabling the user.
216
217             api.plshell.UpdatePerson(api.plauth, person_id, {'enabled' : True})
218             key_ids = []
219         else:
220             key_ids = persons[0]['key_ids']
221
222         api.plshell.AddPersonToSlice(api.plauth, person_dict['email'],
223                                      slicename)        
224
225         # Get this users local keys
226         keylist = api.plshell.GetKeys(api.plauth, key_ids, ['key'])
227         keys = [key['key'] for key in keylist]
228
229         # add keys that arent already there 
230         for personkey in person_dict['keys']:
231             if personkey not in keys:
232                 key = {'key_type': 'ssh', 'key': personkey}
233                 api.plshell.AddPersonKey(api.plauth, person_dict['email'], key)
234
235     # find out where this slice is currently running
236     nodelist = api.plshell.GetNodes(api.plauth, slice['node_ids'],
237                                     ['hostname'])
238     hostnames = [node['hostname'] for node in nodelist]
239
240     # remove nodes not in rspec
241     deleted_nodes = list(set(hostnames).difference(nodes))
242     # add nodes from rspec
243     added_nodes = list(set(nodes).difference(hostnames))
244
245     api.plshell.AddSliceToNodes(api.plauth, slicename, added_nodes) 
246     api.plshell.DeleteSliceFromNodes(api.plauth, slicename, deleted_nodes)
247
248     return 1
249
250
251 def get_rspec(api, creds, options):
252     # get slice's hrn from options
253     xrn = options.get('geni_slice_urn', None)
254     hrn, type = urn_to_hrn(xrn)
255     # Eg. config line:
256     # plc.princeton.sapan vlan23,vlan45
257
258     allocations = read_alloc_dict()
259     if (hrn and allocations.has_key(hrn)):
260             ret_rspec = allocations_to_rspec(allocations[hrn])
261     else:
262         ret_rspec = open(SFA_MAX_CANNED_RSPEC).read()
263
264     return (ret_rspec)
265
266
267 def create_slice(api, xrn, creds, rspec_xml, users):
268     global topology
269     hrn = urn_to_hrn(xrn)[0]
270     topology = get_interface_map()
271
272     # Check if everything in rspec is either allocated by hrn
273     # or not allocated at all.
274     r = RSpec()
275     r.parseString(rspec_xml)
276     rspec = r.toDict()
277
278     lock_state_file()
279
280     allocations = read_alloc_dict()
281     requested_allocations = rspec_to_allocations (rspec)
282     current_allocations = collapse_alloc_dict(allocations)
283     try:
284         current_hrn_allocations=allocations[hrn]
285     except KeyError:
286         current_hrn_allocations=[]
287
288     # Check request against current allocations
289     requested_interfaces = map(lambda(elt):elt[0], requested_allocations)
290     current_interfaces = map(lambda(elt):elt[0], current_allocations)
291     current_hrn_interfaces = map(lambda(elt):elt[0], current_hrn_allocations)
292
293     for a in requested_interfaces:
294         if (a not in current_hrn_interfaces and a in current_interfaces):
295             raise SfaOutOfResource(a)
296         if (topology[a][1] not in requested_interfaces):
297             raise SfaNoPairRSpec(a,topology[a][1])
298     # Request OK
299
300     # Allocations to delete
301     allocations_to_delete = []
302     for a in current_hrn_allocations:
303         if (a not in requested_allocations):
304             allocations_to_delete.extend([a])
305
306     # Ok, let's do our thing
307     alloc_nodes(api, hrn, requested_interfaces)
308     alloc_links(api, hrn, requested_allocations, allocations_to_delete)
309     allocations[hrn] = requested_allocations
310     commit_alloc_dict(allocations)
311
312     unlock_state_file()
313
314     return True
315
316 def rspec_to_allocations(rspec):
317     ifs = []
318     try:
319         ifspecs = rspec['rspec']['request'][0]['ifspec']
320         for l in ifspecs:
321             ifs.extend([(l['name'].replace('tns:',''),l['ip'])])
322     except KeyError:
323         # Bad RSpec
324         pass
325     return ifs
326
327 def main():
328     t = get_interface_map()
329     r = RSpec()
330     rspec_xml = open(sys.argv[1]).read()
331     #get_rspec(None,'foo')
332     create_slice(None, "plc.princeton.sap0", rspec_xml)
333     
334 if __name__ == "__main__":
335     main()