Merge branch 'master' of ssh://git.onelab.eu/git/sfa
[sfa.git] / sfa / managers / aggregate_manager_max.py
1 #!/usr/bin/python
2
3 from sfa.util.rspec import RSpec
4 import sys
5 import pdb
6 from sfa.util.xrn import urn_to_hrn, hrn_to_urn, get_authority
7 from sfa.util.plxrn import hrn_to_pl_slicename
8 from sfa.util.plxrn import hrn_to_pl_slicename
9 from sfa.util.rspec import *
10 from sfa.util.specdict import *
11 from sfa.util.faults import *
12 from sfa.util.storage import *
13 from sfa.util.policy import Policy
14 from sfa.server.aggregate import Aggregates
15 from sfa.server.registry import Registries
16 from sfa.util.faults import *
17
18 import xml.dom.minidom
19
20 SFA_MAX_CONF_FILE = '/etc/sfa/max_allocations'
21 SFA_MAX_DEFAULT_RSPEC = '/etc/sfa/max_physical.xml'
22 SFA_MAX_CANNED_RSPEC = '/etc/sfa/max_physical_canned.xml'
23
24 topology = {}
25
26 class SfaOutOfResource(SfaFault):
27     def __init__(self, interface):
28         faultString = "Interface " + interface + " not available"
29         SfaFault.__init__(self, 100, faultString, '')
30
31 class SfaNoPairRSpec(SfaFault):
32     def __init__(self, interface, interface2):
33         faultString = "Interface " + interface + " should be paired with " + interface2
34         SfaFault.__init__(self, 100, faultString, '')
35
36 # Returns a mapping from interfaces to the nodes they lie on and their peer interfaces
37 # i -> node,i_peer
38
39 def get_interface_map():
40     r = RSpec()
41     r.parseFile(SFA_MAX_DEFAULT_RSPEC)
42     rspec = r.toDict()
43     capacity = rspec['rspec']['capacity']
44     netspec = capacity[0]['netspec'][0]
45     linkdefs = {}
46     for n in netspec['nodespec']:
47         ifspecs = n['ifspec']
48         nodename = n['node']
49         for i in ifspecs:
50             ifname = i['name']
51             linkid = i['linkid']
52
53             if (linkdefs.has_key(linkid)):
54                 linkdefs[linkid].extend([(nodename,ifname)])
55             else:
56                 linkdefs[linkid]=[(nodename,ifname)]
57     
58     # topology maps interface x interface -> link,node1,node2
59     topology={}
60
61     for k in linkdefs.keys():
62         (n1,i1) = linkdefs[k][0]
63         (n2,i2) = linkdefs[k][1]
64
65         topology[i1] = (n1, i2)
66         topology[i2] = (n2, i1)
67         
68
69     return topology    
70
71     
72 def allocations_to_rspec(allocations):
73     rspec = xml.dom.minidom.parse(SFA_MAX_DEFAULT_RSPEC)
74     req = rspec.firstChild.appendChild(rspec.createElement("request"))
75     for (iname,ip) in allocations:
76         ifspec = req.appendChild(rspec.createElement("ifspec"))
77         ifspec.setAttribute("name","tns:"+iname)
78         ifspec.setAttribute("ip",ip)
79
80     return rspec.toxml()
81         
82     
83 def if_endpoints(ifs):
84     nodes=[]
85     for l in ifs:
86         nodes.extend(topology[l][0])
87     return nodes
88
89 def lock_state_file():
90     # Noop for demo
91     return True
92
93 def unlock_state_file():
94     return True
95     # Noop for demo
96
97 def read_alloc_dict():
98     alloc_dict={}
99     rows = open(SFA_MAX_CONF_FILE).read().split('\n')
100     for r in rows:
101         columns = r.split(' ')
102         if (len(columns)==2):
103             hrn = columns[0]
104             allocs = columns[1].split(',')
105             ipallocs = map(lambda alloc:alloc.split('/'), allocs)
106             alloc_dict[hrn]=ipallocs
107     return alloc_dict
108
109 def commit_alloc_dict(d):
110     f = open(SFA_MAX_CONF_FILE, 'w')
111     for hrn in d.keys():
112         columns = d[hrn]
113         ipcolumns = map(lambda x:"/".join(x), columns)
114         row = hrn+' '+','.join(ipcolumns)+'\n'
115         f.write(row)
116     f.close()
117
118 def collapse_alloc_dict(d):
119     ret = []
120     for k in d.keys():
121         ret.extend(d[k])
122     return ret
123
124
125 def alloc_links(api, hrn, links_to_add, links_to_drop):
126     slicename=hrn_to_pl_slicename(hrn)
127     for (iface,ip) in links_to_add:
128         node = topology[iface][0][0]
129         try:
130             api.plshell.AddSliceTag(api.plauth, slicename, "ip_addresses", ip, node)
131             api.plshell.AddSliceTag(api.plauth, slicename, "vsys", "getvlan", node)
132         except Exception: 
133             # Probably a duplicate tag. XXX July 21
134             pass
135     return True
136
137 def alloc_nodes(api,hrn, requested_ifs):
138     requested_nodes = if_endpoints(requested_ifs)
139     create_slice_max_aggregate(api, hrn, requested_nodes)
140
141 # Taken from slices.py
142
143 def create_slice_max_aggregate(api, hrn, nodes):
144     # Get the slice record 
145     global topology
146     topology = get_interface_map()
147     slice = {}
148     registries = Registries(api)
149     registry = registries[api.hrn]
150     credential = api.getCredential()
151     urn = hrn_to_urn(hrn, 'slice')
152     records = registry.Resolve(urn, credential)
153     for record in records:
154         if record.get_type() in ['slice']:
155             slice = record.as_dict()
156     if not slice:
157         raise RecordNotFound(hrn)   
158
159     # Make sure slice exists at plc, if it doesnt add it
160     slicename = hrn_to_pl_slicename(hrn)
161     slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids'])
162     if not slices:
163         parts = slicename.split("_")
164         login_base = parts[0]
165         # if site doesnt exist add it
166         sites = api.plshell.GetSites(api.plauth, [login_base])
167         if not sites:
168             authority = get_authority(hrn)
169             authority_urn = hrn_to_urn(authority, 'authority')
170             site_records = registry.Resolve(authority_urn, credential)
171             site_record = {}
172             if not site_records:
173                 raise RecordNotFound(authority)
174             site_record = site_records[0]
175             site = site_record.as_dict()
176                 
177             # add the site
178             site.pop('site_id')
179             site_id = api.plshell.AddSite(api.plauth, site)
180         else:
181             site = sites[0]
182             
183         slice_fields = {}
184         slice_keys = ['name', 'url', 'description']
185         for key in slice_keys:
186             if key in slice and slice[key]:
187                 slice_fields[key] = slice[key]  
188         api.plshell.AddSlice(api.plauth, slice_fields)
189         slice = slice_fields
190         slice['node_ids'] = 0
191     else:
192         slice = slices[0]    
193
194     # get the list of valid slice users from the registry and make 
195     # they are added to the slice 
196     researchers = record.get('researcher', [])
197     for researcher in researchers:
198         person_record = {}
199         researcher_urn = hrn_to_urn(researcher, 'user')
200         person_records = registry.Resolve(researcher_urn, credential)
201         for record in person_records:
202             if record.get_type() in ['user']:
203                 person_record = record
204         if not person_record:
205             pass
206         person_dict = person_record.as_dict()
207         persons = api.plshell.GetPersons(api.plauth, [person_dict['email']],
208                                          ['person_id', 'key_ids'])
209
210         # Create the person record 
211         if not persons:
212             person_id=api.plshell.AddPerson(api.plauth, person_dict)
213
214             # The line below enables the user account on the remote aggregate
215             # soon after it is created.
216             # without this the user key is not transfered to the slice
217             # (as GetSlivers returns key of only enabled users),
218             # which prevents the user from login to the slice.
219             # We may do additional checks before enabling the user.
220
221             api.plshell.UpdatePerson(api.plauth, person_id, {'enabled' : True})
222             key_ids = []
223         else:
224             key_ids = persons[0]['key_ids']
225
226         api.plshell.AddPersonToSlice(api.plauth, person_dict['email'],
227                                      slicename)        
228
229         # Get this users local keys
230         keylist = api.plshell.GetKeys(api.plauth, key_ids, ['key'])
231         keys = [key['key'] for key in keylist]
232
233         # add keys that arent already there 
234         for personkey in person_dict['keys']:
235             if personkey not in keys:
236                 key = {'key_type': 'ssh', 'key': personkey}
237                 api.plshell.AddPersonKey(api.plauth, person_dict['email'], key)
238
239     # find out where this slice is currently running
240     nodelist = api.plshell.GetNodes(api.plauth, slice['node_ids'],
241                                     ['hostname'])
242     hostnames = [node['hostname'] for node in nodelist]
243
244     # remove nodes not in rspec
245     deleted_nodes = list(set(hostnames).difference(nodes))
246     # add nodes from rspec
247     added_nodes = list(set(nodes).difference(hostnames))
248
249     api.plshell.AddSliceToNodes(api.plauth, slicename, added_nodes) 
250     api.plshell.DeleteSliceFromNodes(api.plauth, slicename, deleted_nodes)
251
252     return 1
253
254
255 def get_rspec(api, creds, options):
256     # get slice's hrn from options
257     xrn = options.get('geni_slice_urn', '')
258     hrn, type = urn_to_hrn(xrn)
259     # Eg. config line:
260     # plc.princeton.sapan vlan23,vlan45
261
262     allocations = read_alloc_dict()
263     if (hrn and allocations.has_key(hrn)):
264             ret_rspec = allocations_to_rspec(allocations[hrn])
265     else:
266         ret_rspec = open(SFA_MAX_CANNED_RSPEC).read()
267
268     return (ret_rspec)
269
270
271 def create_slice(api, xrn, creds, rspec_xml, users):
272     global topology
273     hrn = urn_to_hrn(xrn)[0]
274     topology = get_interface_map()
275
276     # Check if everything in rspec is either allocated by hrn
277     # or not allocated at all.
278     r = RSpec()
279     r.parseString(rspec_xml)
280     rspec = r.toDict()
281
282     lock_state_file()
283
284     allocations = read_alloc_dict()
285     requested_allocations = rspec_to_allocations (rspec)
286     current_allocations = collapse_alloc_dict(allocations)
287     try:
288         current_hrn_allocations=allocations[hrn]
289     except KeyError:
290         current_hrn_allocations=[]
291
292     # Check request against current allocations
293     requested_interfaces = map(lambda(elt):elt[0], requested_allocations)
294     current_interfaces = map(lambda(elt):elt[0], current_allocations)
295     current_hrn_interfaces = map(lambda(elt):elt[0], current_hrn_allocations)
296
297     for a in requested_interfaces:
298         if (a not in current_hrn_interfaces and a in current_interfaces):
299             raise SfaOutOfResource(a)
300         if (topology[a][1] not in requested_interfaces):
301             raise SfaNoPairRSpec(a,topology[a][1])
302     # Request OK
303
304     # Allocations to delete
305     allocations_to_delete = []
306     for a in current_hrn_allocations:
307         if (a not in requested_allocations):
308             allocations_to_delete.extend([a])
309
310     # Ok, let's do our thing
311     alloc_nodes(api, hrn, requested_interfaces)
312     alloc_links(api, hrn, requested_allocations, allocations_to_delete)
313     allocations[hrn] = requested_allocations
314     commit_alloc_dict(allocations)
315
316     unlock_state_file()
317
318     return True
319
320 def rspec_to_allocations(rspec):
321     ifs = []
322     try:
323         ifspecs = rspec['rspec']['request'][0]['ifspec']
324         for l in ifspecs:
325             ifs.extend([(l['name'].replace('tns:',''),l['ip'])])
326     except KeyError:
327         # Bad RSpec
328         pass
329     return ifs
330
331 def main():
332     t = get_interface_map()
333     r = RSpec()
334     rspec_xml = open(sys.argv[1]).read()
335     #get_rspec(None,'foo')
336     create_slice(None, "plc.princeton.sap0", rspec_xml)
337     
338 if __name__ == "__main__":
339     main()