build (plc.)Aggregate from a driver and not from an api
[sfa.git] / sfa / managers / aggregate_manager.py
1 import datetime
2 import time
3 import sys
4
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import RecordNotFound, SliverDoesNotExist
7 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
8 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
9 from sfa.util.version import version_core
10 from sfa.util.sfatime import utcparse
11 from sfa.util.callids import Callids
12
13 from sfa.trust.sfaticket import SfaTicket
14 from sfa.trust.credential import Credential
15
16 from sfa.rspecs.version_manager import VersionManager
17 from sfa.rspecs.rspec import RSpec
18
19 from sfa.server.sfaapi import SfaApi
20
21 import sfa.plc.peers as peers
22 from sfa.plc.aggregate import Aggregate
23 from sfa.plc.slices import Slices
24
25 class AggregateManager:
26
27     def __init__ (self):
28         # xxx Thierry : caching at the aggregate level sounds wrong...
29         self.caching=True
30         #self.caching=False
31     
32     def GetVersion(self, api, options={}):
33     
34         version_manager = VersionManager()
35         ad_rspec_versions = []
36         request_rspec_versions = []
37         for rspec_version in version_manager.versions:
38             if rspec_version.content_type in ['*', 'ad']:
39                 ad_rspec_versions.append(rspec_version.to_dict())
40             if rspec_version.content_type in ['*', 'request']:
41                 request_rspec_versions.append(rspec_version.to_dict()) 
42         xrn=Xrn(api.hrn)
43         version_more = {'interface':'aggregate',
44                         'sfa': 2,
45                         'geni_api': api.config.SFA_AGGREGATE_API_VERSION,
46                         'testbed':'myplc',
47                         'hrn':xrn.get_hrn(),
48                         'geni_request_rspec_versions': request_rspec_versions,
49                         'geni_ad_rspec_versions': ad_rspec_versions,
50                         }
51         return version_core(version_more)
52     
53     def _get_registry_objects(self, slice_xrn, creds, users):
54         """
55     
56         """
57         hrn, _ = urn_to_hrn(slice_xrn)
58     
59         #hrn_auth = get_authority(hrn)
60     
61         # Build up objects that an SFA registry would return if SFA
62         # could contact the slice's registry directly
63         reg_objects = None
64     
65         if users:
66             # dont allow special characters in the site login base
67             #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
68             #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
69             slicename = hrn_to_pl_slicename(hrn)
70             login_base = slicename.split('_')[0]
71             reg_objects = {}
72             site = {}
73             site['site_id'] = 0
74             site['name'] = 'geni.%s' % login_base 
75             site['enabled'] = True
76             site['max_slices'] = 100
77     
78             # Note:
79             # Is it okay if this login base is the same as one already at this myplc site?
80             # Do we need uniqueness?  Should use hrn_auth instead of just the leaf perhaps?
81             site['login_base'] = login_base
82             site['abbreviated_name'] = login_base
83             site['max_slivers'] = 1000
84             reg_objects['site'] = site
85     
86             slice = {}
87             
88             # get_expiration always returns a normalized datetime - no need to utcparse
89             extime = Credential(string=creds[0]).get_expiration()
90             # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
91             if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
92                 extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
93             slice['expires'] = int(time.mktime(extime.timetuple()))
94             slice['hrn'] = hrn
95             slice['name'] = hrn_to_pl_slicename(hrn)
96             slice['url'] = hrn
97             slice['description'] = hrn
98             slice['pointer'] = 0
99             reg_objects['slice_record'] = slice
100     
101             reg_objects['users'] = {}
102             for user in users:
103                 user['key_ids'] = []
104                 hrn, _ = urn_to_hrn(user['urn'])
105                 user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
106                 user['first_name'] = hrn
107                 user['last_name'] = hrn
108                 reg_objects['users'][user['email']] = user
109     
110             return reg_objects
111     
112     def SliverStatus(self, api, slice_xrn, creds, options={}):
113         call_id = options.get('call_id')
114         if Callids().already_handled(call_id): return {}
115     
116         (hrn, _) = urn_to_hrn(slice_xrn)
117         # find out where this slice is currently running
118         slicename = hrn_to_pl_slicename(hrn)
119         
120         slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
121         if len(slices) == 0:        
122             raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
123         slice = slices[0]
124         
125         # report about the local nodes only
126         nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
127                                      ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
128         site_ids = [node['site_id'] for node in nodes]
129     
130         result = {}
131         top_level_status = 'unknown'
132         if nodes:
133             top_level_status = 'ready'
134         slice_urn = Xrn(slice_xrn, 'slice').get_urn()
135         result['geni_urn'] = slice_urn
136         result['pl_login'] = slice['name']
137         result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
138         
139         resources = []
140         for node in nodes:
141             res = {}
142             res['pl_hostname'] = node['hostname']
143             res['pl_boot_state'] = node['boot_state']
144             res['pl_last_contact'] = node['last_contact']
145             if node['last_contact'] is not None:
146                 res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
147             sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id']) 
148             res['geni_urn'] = sliver_id
149             if node['boot_state'] == 'boot':
150                 res['geni_status'] = 'ready'
151             else:
152                 res['geni_status'] = 'failed'
153                 top_level_status = 'failed' 
154                 
155             res['geni_error'] = ''
156     
157             resources.append(res)
158             
159         result['geni_status'] = top_level_status
160         result['geni_resources'] = resources
161         return result
162     
163     def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, options={}):
164         """
165         Create the sliver[s] (slice) at this aggregate.    
166         Verify HRN and initialize the slice record in PLC if necessary.
167         """
168         call_id = options.get('call_id')
169         if Callids().already_handled(call_id): return ""
170     
171         aggregate = Aggregate(self.driver)
172         slices = Slices(api)
173         (hrn, _) = urn_to_hrn(slice_xrn)
174         peer = slices.get_peer(hrn)
175         sfa_peer = slices.get_sfa_peer(hrn)
176         slice_record=None    
177         if users:
178             slice_record = users[0].get('slice_record', {})
179     
180         # parse rspec
181         rspec = RSpec(rspec_string)
182         requested_attributes = rspec.version.get_slice_attributes()
183         
184         # ensure site record exists
185         site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
186         # ensure slice record exists
187         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
188         # ensure person records exists
189         persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
190         # ensure slice attributes exists
191         slices.verify_slice_attributes(slice, requested_attributes)
192         
193         # add/remove slice from nodes
194         requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
195         slices.verify_slice_nodes(slice, requested_slivers, peer) 
196    
197         # add/remove links links 
198         slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
199     
200         # handle MyPLC peer association.
201         # only used by plc and ple.
202         slices.handle_peer(site, slice, persons, peer)
203         
204         return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
205     
206     
207     def RenewSliver(self, api, xrn, creds, expiration_time, options={}):
208         call_id = options.get('call_id')
209         if Callids().already_handled(call_id): return True
210         (hrn, _) = urn_to_hrn(xrn)
211         slicename = hrn_to_pl_slicename(hrn)
212         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
213         if not slices:
214             raise RecordNotFound(hrn)
215         slice = slices[0]
216         requested_time = utcparse(expiration_time)
217         record = {'expires': int(time.mktime(requested_time.timetuple()))}
218         try:
219             api.driver.UpdateSlice(slice['slice_id'], record)
220             return True
221         except:
222             return False
223     
224     def start_slice(self, api, xrn, creds):
225         (hrn, _) = urn_to_hrn(xrn)
226         slicename = hrn_to_pl_slicename(hrn)
227         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
228         if not slices:
229             raise RecordNotFound(hrn)
230         slice_id = slices[0]['slice_id']
231         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
232         # just remove the tag if it exists
233         if slice_tags:
234             api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
235     
236         return 1
237      
238     def stop_slice(self, api, xrn, creds):
239         hrn, _ = urn_to_hrn(xrn)
240         slicename = hrn_to_pl_slicename(hrn)
241         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
242         if not slices:
243             raise RecordNotFound(hrn)
244         slice_id = slices[0]['slice_id']
245         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
246         if not slice_tags:
247             api.driver.AddSliceTag(slice_id, 'enabled', '0')
248         elif slice_tags[0]['value'] != "0":
249             tag_id = slice_tags[0]['slice_tag_id']
250             api.driver.UpdateSliceTag(tag_id, '0')
251         return 1
252     
253     def reset_slice(self, api, xrn):
254         # XX not implemented at this interface
255         return 1
256     
257     def DeleteSliver(self, api, xrn, creds, options={}):
258         call_id = options.get('call_id')
259         if Callids().already_handled(call_id): return ""
260         (hrn, _) = urn_to_hrn(xrn)
261         slicename = hrn_to_pl_slicename(hrn)
262         slices = api.driver.GetSlices({'name': slicename})
263         if not slices:
264             return 1
265         slice = slices[0]
266     
267         # determine if this is a peer slice
268         peer = peers.get_peer(api, hrn)
269         try:
270             if peer:
271                 api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
272             api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
273         finally:
274             if peer:
275                 api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
276         return 1
277     
278     def ListSlices(self, api, creds, options={}):
279         call_id = options.get('call_id')
280         if Callids().already_handled(call_id): return []
281         # look in cache first
282         if self.caching and api.cache:
283             slices = api.cache.get('slices')
284             if slices:
285                 return slices
286     
287         # get data from db 
288         slices = api.driver.GetSlices({'peer_id': None}, ['name'])
289         slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
290         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
291     
292         # cache the result
293         if self.caching and api.cache:
294             api.cache.add('slices', slice_urns) 
295     
296         return slice_urns
297         
298     def ListResources(self, api, creds, options={}):
299         call_id = options.get('call_id')
300         if Callids().already_handled(call_id): return ""
301         # get slice's hrn from options
302         xrn = options.get('geni_slice_urn', None)
303         cached = options.get('cached', True) 
304         (hrn, _) = urn_to_hrn(xrn)
305     
306         version_manager = VersionManager()
307         # get the rspec's return format from options
308         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
309         version_string = "rspec_%s" % (rspec_version)
310     
311         #panos adding the info option to the caching key (can be improved)
312         if options.get('info'):
313             version_string = version_string + "_"+options.get('info', 'default')
314     
315         # look in cache first
316         if self.caching and api.cache and not xrn and cached:
317             rspec = api.cache.get(version_string)
318             if rspec:
319                 api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
320                 return rspec 
321     
322         #panos: passing user-defined options
323         #print "manager options = ",options
324         aggregate = Aggregate(self.driver)
325         rspec =  aggregate.get_rspec(slice_xrn=xrn, version=rspec_version, options=options)
326     
327         # cache the result
328         if self.caching and api.cache and not xrn:
329             api.cache.add(version_string, rspec)
330     
331         return rspec
332     
333     
334     def GetTicket(self, api, xrn, creds, rspec, users, options={}):
335     
336         (slice_hrn, _) = urn_to_hrn(xrn)
337         slices = Slices(api)
338         peer = slices.get_peer(slice_hrn)
339         sfa_peer = slices.get_sfa_peer(slice_hrn)
340     
341         # get the slice record
342         credential = api.getCredential()
343         interface = api.registries[api.hrn]
344         registry = api.server_proxy(interface, credential)
345         records = registry.Resolve(xrn, credential)
346     
347         # make sure we get a local slice record
348         record = None
349         for tmp_record in records:
350             if tmp_record['type'] == 'slice' and \
351                not tmp_record['peer_authority']:
352     #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
353                 record = SliceRecord(dict=tmp_record)
354         if not record:
355             raise RecordNotFound(slice_hrn)
356         
357         # similar to CreateSliver, we must verify that the required records exist
358         # at this aggregate before we can issue a ticket
359         # parse rspec
360         rspec = RSpec(rspec_string)
361         requested_attributes = rspec.version.get_slice_attributes()
362     
363         # ensure site record exists
364         site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
365         # ensure slice record exists
366         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
367         # ensure person records exists
368         persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
369         # ensure slice attributes exists
370         slices.verify_slice_attributes(slice, requested_attributes)
371         
372         # get sliver info
373         slivers = slices.get_slivers(slice_hrn)
374     
375         if not slivers:
376             raise SliverDoesNotExist(slice_hrn)
377     
378         # get initscripts
379         initscripts = []
380         data = {
381             'timestamp': int(time.time()),
382             'initscripts': initscripts,
383             'slivers': slivers
384         }
385     
386         # create the ticket
387         object_gid = record.get_gid_object()
388         new_ticket = SfaTicket(subject = object_gid.get_subject())
389         new_ticket.set_gid_caller(api.auth.client_gid)
390         new_ticket.set_gid_object(object_gid)
391         new_ticket.set_issuer(key=api.key, subject=api.hrn)
392         new_ticket.set_pubkey(object_gid.get_pubkey())
393         new_ticket.set_attributes(data)
394         new_ticket.set_rspec(rspec)
395         #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
396         new_ticket.encode()
397         new_ticket.sign()
398     
399         return new_ticket.save_to_string(save_parents=True)