Improvements on the max rspec manager
[sfa.git] / sfa / rspecs / aggregates / rspec_manager_max.py
1 #!/usr/bin/python
2
3 from sfa.util.rspec import Rspec
4 import sys
5 import pdb
6 from sfa.util.misc import *
7 from sfa.util.rspec import *
8 from sfa.util.specdict import *
9 from sfa.util.faults import *
10 from sfa.util.storage import *
11 from sfa.util.policy import Policy
12 from sfa.util.debug import log
13 from sfa.server.aggregate import Aggregates
14 from sfa.server.registry import Registries
15 from sfa.util.faults import *
16
17 import xml.dom.minidom
18
19 SFA_MAX_CONF_FILE = '/etc/sfa/max_allocations'
20 SFA_MAX_DEFAULT_RSPEC = '/etc/sfa/max_physical.xml'
21
22 topology = {}
23
24 class GeniOutOfResource(GeniFault):
25     def __init__(self, interface):
26         faultString = "Interface " + interface + " not available"
27         GeniFault.__init__(self, 100, faultString, '')
28
29 class GeniNoPairRspec(GeniFault):
30     def __init__(self, interface, interface2):
31         faultString = "Interface " + interface + " should be paired with " + interface2
32         GeniFault.__init__(self, 100, faultString, '')
33
34 # Returns a mapping from interfaces to the nodes they lie on and their peer interfaces
35 # i -> node,i_peer
36
37 def get_interface_map():
38     r = Rspec()
39     r.parseFile(SFA_MAX_DEFAULT_RSPEC)
40     rspec = r.toDict()
41     capacity = rspec['rspec']['capacity']
42     netspec = capacity[0]['netspec'][0]
43     linkdefs = {}
44     for n in netspec['nodespec']:
45         ifspecs = n['ifspec']
46         nodename = n['node']
47         for i in ifspecs:
48             ifname = i['name']
49             linkid = i['linkid']
50
51             if (linkdefs.has_key(linkid)):
52                 linkdefs[linkid].extend([(nodename,ifname)])
53             else:
54                 linkdefs[linkid]=[(nodename,ifname)]
55     
56     # topology maps interface x interface -> link,node1,node2
57     topology={}
58
59     for k in linkdefs.keys():
60         (n1,i1) = linkdefs[k][0]
61         (n2,i2) = linkdefs[k][1]
62
63         topology[i1] = (n1, i2)
64         topology[i2] = (n2, i1)
65         
66
67     return topology    
68
69     
70
71 #def allocations_to_rspec(allocations):
72 #    rspec = xml.minidom.parseFile(SFA_MAX_DEFAULT_RSPEC)
73 #    req = rspec.firstChild.appendChild(rspec.createElement("request"))
74 #    for a in allocations:
75 #        (link,
76         
77     
78 def if_endpoints(ifs):
79     nodes=[]
80     for l in ifs:
81         nodes.extend([topology[l][0]])
82     return nodes
83
84 def lock_state_file():
85     # Noop for demo
86     return True
87
88 def unlock_state_file():
89     return True
90     # Noop for demo
91
92 def read_alloc_dict():
93     alloc_dict={}
94     rows = open(SFA_MAX_CONF_FILE).read().split('\n')
95     for r in rows:
96         columns = r.split(' ')
97         if (len(columns)==2):
98             hrn = columns[0]
99             allocs = columns[1].split(',')
100             ipallocs = map(lambda alloc:alloc.split('/'), allocs)
101             alloc_dict[hrn]=ipallocs
102     return alloc_dict
103
104 def commit_alloc_dict(d):
105     f = open(SFA_MAX_CONF_FILE, 'w')
106     for hrn in d.keys():
107         columns = d[hrn]
108         ipcolumns = map(lambda x:"/".join(x), columns)
109         row = hrn+' '+','.join(ipcolumns)+'\n'
110         f.write(row)
111     f.close()
112
113 def collapse_alloc_dict(d):
114     ret = []
115     for k in d.keys():
116         ret.extend(d[k])
117     return ret
118
119
120 def alloc_links(api, links_to_add, links_to_drop, foo):
121     #for l in links_to_add:
122         #(node1,ip1,node2,ip2) = l
123         #api.plshell.AddSliceTag(api.plauth, [slicename], ['node_ids'])
124     return True
125
126 def alloc_nodes(api,hrn, requested_ifs):
127     
128     requested_nodes = if_endpoints(requested_ifs)
129
130     #create_slice_max_aggregate(api, hrn, requested_nodes)
131
132 # Taken from slices.py
133
134 def create_slice_max_aggregate(api, hrn, nodes):
135     # Get the slice record from geni
136     global topology
137     topology = get_interface_map()
138     slice = {}
139     registries = Registries(api)
140     registry = registries[api.hrn]
141     credential = api.getCredential()
142     records = registry.resolve(credential, hrn)
143     for record in records:
144         if record.get_type() in ['slice']:
145             slice = record.as_dict()
146     if not slice:
147         raise RecordNotFound(hrn)   
148
149     # Make sure slice exists at plc, if it doesnt add it
150     slicename = hrn_to_pl_slicename(hrn)
151     slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids'])
152     if not slices:
153         parts = slicename.split("_")
154         login_base = parts[0]
155         # if site doesnt exist add it
156         sites = api.plshell.GetSites(api.plauth, [login_base])
157         if not sites:
158             authority = get_authority(hrn)
159             site_records = registry.resolve(credential, authority)
160             site_record = {}
161             if not site_records:
162                 raise RecordNotFound(authority)
163             site_record = site_records[0]
164             site = site_record.as_dict()
165                 
166             # add the site
167             site.pop('site_id')
168             site_id = api.plshell.AddSite(api.plauth, site)
169         else:
170             site = sites[0]
171             
172         slice_fields = {}
173         slice_keys = ['name', 'url', 'description']
174         for key in slice_keys:
175             if key in slice and slice[key]:
176                 slice_fields[key] = slice[key]  
177         api.plshell.AddSlice(api.plauth, slice_fields)
178         slice = slice_fields
179         slice['node_ids'] = 0
180     else:
181         slice = slices[0]    
182
183     # get the list of valid slice users from the registry and make 
184     # they are added to the slice 
185     researchers = record.get('researcher', [])
186     for researcher in researchers:
187         person_record = {}
188         person_records = registry.resolve(credential, researcher)
189         for record in person_records:
190             if record.get_type() in ['user']:
191                 person_record = record
192         if not person_record:
193             pass
194         person_dict = person_record.as_dict()
195         persons = api.plshell.GetPersons(api.plauth, [person_dict['email']],
196                                          ['person_id', 'key_ids'])
197
198         # Create the person record 
199         if not persons:
200             person_id=api.plshell.AddPerson(api.plauth, person_dict)
201
202             # The line below enables the user account on the remote aggregate
203             # soon after it is created.
204             # without this the user key is not transfered to the slice
205             # (as GetSlivers returns key of only enabled users),
206             # which prevents the user from login to the slice.
207             # We may do additional checks before enabling the user.
208
209             api.plshell.UpdatePerson(api.plauth, person_id, {'enabled' : True})
210             key_ids = []
211         else:
212             key_ids = persons[0]['key_ids']
213
214         api.plshell.AddPersonToSlice(api.plauth, person_dict['email'],
215                                      slicename)        
216
217         # Get this users local keys
218         keylist = api.plshell.GetKeys(api.plauth, key_ids, ['key'])
219         keys = [key['key'] for key in keylist]
220
221         # add keys that arent already there 
222         for personkey in person_dict['keys']:
223             if personkey not in keys:
224                 key = {'key_type': 'ssh', 'key': personkey}
225                 api.plshell.AddPersonKey(api.plauth, person_dict['email'], key)
226
227     # find out where this slice is currently running
228     nodelist = api.plshell.GetNodes(api.plauth, slice['node_ids'],
229                                     ['hostname'])
230     hostnames = [node['hostname'] for node in nodelist]
231
232     # remove nodes not in rspec
233     deleted_nodes = list(set(hostnames).difference(nodes))
234     # add nodes from rspec
235     added_nodes = list(set(nodes).difference(hostnames))
236
237     api.plshell.AddSliceToNodes(api.plauth, slicename, added_nodes) 
238     api.plshell.DeleteSliceFromNodes(api.plauth, slicename, deleted_nodes)
239
240     return 1
241
242
243 def get_rspec(api, hrn):
244     # Eg. config line:
245     # plc.princeton.sapan vlan23,vlan45
246
247     allocations = read_alloc_dict()
248     if (hrn):
249         ret_rspec = allocations_to_rspec(allocations[hrn])
250     else:
251         ret_rspec = open(SFA_MAX_DEFAULT_RSPEC).read()
252
253     return (ret_rspec)
254
255
256 def create_slice(api, hrn, rspec_xml):
257     global topology
258     topology = get_interface_map()
259
260     # Check if everything in rspec is either allocated by hrn
261     # or not allocated at all.
262     r = Rspec()
263     r.parseString(rspec_xml)
264     rspec = r.toDict()
265
266     lock_state_file()
267
268     allocations = read_alloc_dict()
269     requested_allocations = rspec_to_allocations (rspec)
270     current_allocations = collapse_alloc_dict(allocations)
271     try:
272         current_hrn_allocations=allocations[hrn]
273     except KeyError:
274         current_hrn_allocations=[]
275
276     # Check request against current allocations
277     requested_interfaces = map(lambda(elt):elt[0], requested_allocations)
278     current_interfaces = map(lambda(elt):elt[0], current_allocations)
279     current_hrn_interfaces = map(lambda(elt):elt[0], current_hrn_allocations)
280
281     pdb.set_trace()
282     for a in requested_interfaces:
283         if (a not in current_hrn_interfaces and a in current_interfaces):
284             raise GeniOutOfResource(a)
285         if (topology[a][1] not in requested_interfaces):
286             raise GeniNoPairRspec(a,topology[a][1])
287     # Request OK
288
289     # Allocations to delete
290     allocations_to_delete = []
291     for a in current_hrn_allocations:
292         if (a not in requested_allocations):
293             allocations_to_delete.extend([a])
294
295     # Ok, let's do our thing
296     alloc_nodes(api, hrn, requested_interfaces)
297     alloc_links(api, hrn, requested_allocations, allocations_to_delete)
298     allocations[hrn] = requested_allocations
299     commit_alloc_dict(allocations)
300
301     unlock_state_file()
302
303     return True
304
305 def rspec_to_allocations(rspec):
306     ifs = []
307     try:
308         ifspecs = rspec['rspec']['request'][0]['ifspec']
309         for l in ifspecs:
310             ifs.extend([(l['name'].replace('tns:',''),l['ip'])])
311     except KeyError:
312         # Bad Rspec
313         pass
314     return ifs
315
316 def main():
317     t = get_interface_map()
318     r = Rspec()
319     rspec_xml = open(sys.argv[1]).read()
320     create_slice(None,'foo',rspec_xml)
321     
322 if __name__ == "__main__":
323     main()