Bug fix
[sfa.git] / sfa / openstack / osaggregate.py
1
2 import os
3 import socket
4 import base64
5 import string
6 import random    
7 from collections import defaultdict
8 from nova.exception import ImageNotFound
9 from nova.api.ec2.cloud import CloudController
10 from sfa.util.faults import SfaAPIError
11 from sfa.rspecs.rspec import RSpec
12 from sfa.rspecs.elements.hardware_type import HardwareType
13 from sfa.rspecs.elements.node import Node
14 from sfa.rspecs.elements.sliver import Sliver
15 from sfa.rspecs.elements.login import Login
16 from sfa.rspecs.elements.disk_image import DiskImage
17 from sfa.rspecs.elements.services import Services
18 from sfa.rspecs.elements.interface import Interface
19 from sfa.util.xrn import Xrn
20 from sfa.util.plxrn import PlXrn, hrn_to_pl_slicename
21 from sfa.util.osxrn import OSXrn
22 from sfa.rspecs.version_manager import VersionManager
23 from sfa.openstack.image import ImageManager
24 from sfa.openstack.security_group import SecurityGroup
25 from sfa.util.sfalogging import logger
26
27 def instance_to_sliver(instance, slice_xrn=None):
28     # should include?
29     # * instance.image_ref
30     # * instance.kernel_id
31     # * instance.ramdisk_id
32     import nova.db.sqlalchemy.models
33     name=None
34     type=None
35     sliver_id = None
36     if isinstance(instance, dict):
37         # this is an isntance type dict
38         name = instance['name']
39         type = instance['name']
40     elif isinstance(instance, nova.db.sqlalchemy.models.Instance):
41         # this is an object that describes a running instance
42         name = instance.display_name
43         type = instance.instance_type.name
44     else:
45         raise SfaAPIError("instnace must be an instance_type dict or" + \
46                            " a nova.db.sqlalchemy.models.Instance object")
47     if slice_xrn:
48         xrn = Xrn(slice_xrn, 'slice')
49         sliver_id = xrn.get_sliver_id(instance.project_id, instance.hostname, instance.id)
50
51     sliver = Sliver({'slice_id': sliver_id,
52                      'name': name,
53                      'type':  type,
54                      'tags': []})
55     return sliver
56     
57
58 def ec2_id(id=None, type=None):
59     ec2_id = None
60     if type == 'ovf':
61         type = 'ami'   
62     if id and type:
63         ec2_id = CloudController.image_ec2_id(id, type)        
64     return ec2_id
65
66
67 class OSAggregate:
68
69     def __init__(self, driver):
70         self.driver = driver
71
72     def get_rspec(self, slice_xrn=None, version=None, options={}):
73         version_manager = VersionManager()
74         version = version_manager.get_version(version)
75         if not slice_xrn:
76             rspec_version = version_manager._get_version(version.type, version.version, 'ad')
77             nodes = self.get_aggregate_nodes()
78         else:
79             rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
80             nodes = self.get_slice_nodes(slice_xrn)
81         rspec = RSpec(version=rspec_version, user_options=options)
82         rspec.version.add_nodes(nodes)
83         return rspec.toxml()
84
85     def get_availability_zones(self):
86         zones = self.driver.shell.db.zone_get_all()
87         if not zones:
88             zones = ['cloud']
89         else:
90             zones = [zone.name for zone in zones]
91
92     def get_slice_nodes(self, slice_xrn):
93         image_manager = ImageManager(self.driver)
94
95         zones = self.get_availability_zones()
96         name = hrn_to_pl_slicename(slice_xrn)
97         instances = self.driver.shell.db.instance_get_all_by_project(name)
98         rspec_nodes = []
99         for instance in instances:
100             rspec_node = Node()
101             interfaces = []
102             for fixed_ip in instance.fixed_ips:
103                 if_xrn = PlXrn(auth=self.driver.hrn, 
104                                interface='node%s:eth0' % (instance.hostname)) 
105                 interface = Interface({'component_id': if_xrn.urn})
106                 interface['ips'] =  [{'address': fixed_ip['address'],
107                                      'netmask': fixed_ip['network'].netmask,
108                                      'type': 'ipv4'}]
109                 interfaces.append(interface)
110             if instance.availability_zone:
111                 node_xrn = OSXrn(instance.availability_zone, 'node')
112             else:
113                 node_xrn = OSXrn('cloud', 'node')
114
115             rspec_node['component_id'] = node_xrn.urn
116             rspec_node['component_name'] = node_xrn.name
117             rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()   
118             sliver = instance_to_sliver(instance)
119             disk_image = image_manager.get_disk_image(instance.image_ref)
120             sliver['disk_image'] = [disk_image.to_rspec_object()]
121             rspec_node['slivers'] = [sliver]
122             rspec_node['interfaces'] = interfaces
123             # slivers always provide the ssh service
124             hostname = None
125             for interface in interfaces:
126                 if 'ips' in interface and interface['ips'] and \
127                 isinstance(interface['ips'], list):
128                     if interface['ips'][0].get('address'):
129                         hostname = interface['ips'][0].get('address')
130                         break 
131             login = Login({'authentication': 'ssh-keys', 
132                            'hostname': hostname, 
133                            'port':'22', 'username': 'root'})
134             service = Services({'login': login})
135             rspec_node['services'] = [service] 
136             rspec_nodes.append(rspec_node)
137         return rspec_nodes
138
139     def get_aggregate_nodes(self):
140         zones = self.get_availability_zones()
141         # available sliver/instance/vm types
142         instances = self.driver.shell.db.instance_type_get_all().values()
143         # available images
144         image_manager = ImageManager(self.driver)
145         disk_images = image_manager.get_available_disk_images()
146         disk_image_objects = [image.to_rspec_object() \
147                                for image in disk_images]  
148         rspec_nodes = []
149         for zone in zones:
150             rspec_node = Node()
151             xrn = OSXrn(zone, 'node')
152             rspec_node['component_id'] = xrn.urn
153             rspec_node['component_name'] = xrn.name
154             rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
155             rspec_node['exclusive'] = 'false'
156             rspec_node['hardware_types'] = [HardwareType({'name': 'plos-pc'}),
157                                                 HardwareType({'name': 'pc'})]
158             slivers = []
159             for instance in instances:
160                 sliver = instance_to_sliver(instance)
161                 sliver['disk_image'] = disk_image_objects
162                 slivers.append(sliver)
163         
164             rspec_node['slivers'] = slivers
165             rspec_nodes.append(rspec_node) 
166
167         return rspec_nodes 
168
169
170     def create_project(self, slicename, users, options={}):
171         """
172         Create the slice if it doesn't alredy exist. Create user
173         accounts that don't already exist   
174         """
175         from nova.exception import ProjectNotFound, UserNotFound
176         for user in users:
177             username = Xrn(user['urn']).get_leaf()
178             try:
179                 self.driver.shell.auth_manager.get_user(username)
180             except UserNotFound:
181                 self.driver.shell.auth_manager.create_user(username)
182             self.verify_user_keys(username, user['keys'], options)
183
184         try:
185             slice = self.driver.shell.auth_manager.get_project(slicename)
186         except ProjectNotFound:
187             # assume that the first user is the project manager
188             proj_manager = Xrn(users[0]['urn']).get_leaf()
189             self.driver.shell.auth_manager.create_project(slicename, proj_manager) 
190
191     def verify_user_keys(self, username, keys, options={}):
192         """
193         Add requested keys.
194         """
195         append = options.get('append', True)    
196         existing_keys = self.driver.shell.db.key_pair_get_all_by_user(username)
197         existing_pub_keys = [key.public_key for key in existing_keys]
198         removed_pub_keys = set(existing_pub_keys).difference(keys)
199         added_pub_keys = set(keys).difference(existing_pub_keys)
200         pubkeys = []
201         # add new keys
202         for public_key in added_pub_keys:
203             key = {}
204             key['user_id'] = username
205             key['name'] =  username
206             key['public_key'] = public_key
207             self.driver.shell.db.key_pair_create(key)
208
209         # remove old keys
210         if not append:
211             for key in existing_keys:
212                 if key.public_key in removed_pub_keys:
213                     self.driver.shell.db.key_pair_destroy(username, key.name)
214
215
216     def create_security_group(self, slicename, fw_rules=[]):
217         # use default group by default
218         group_name = 'default' 
219         if isinstance(fw_rules, list) and fw_rules:
220             # Each sliver get's its own security group.
221             # Keep security group names unique by appending some random
222             # characters on end.
223             random_name = "".join([random.choice(string.letters+string.digits)
224                                            for i in xrange(6)])
225             group_name = slicename + random_name 
226             security_group = SecurityGroup(self.driver)
227             security_group.create_security_group(group_name)
228             for rule in fw_rules:
229                 security_group.add_rule_to_group(group_name, 
230                                              protocol = rule.get('protocol'), 
231                                              cidr_ip = rule.get('cidr_ip'), 
232                                              port_range = rule.get('port_range'), 
233                                              icmp_type_code = rule.get('icmp_type_code'))
234         return group_name
235
236     def add_rule_to_security_group(self, group_name, **kwds):
237         security_group = SecurityGroup(self.driver)
238         security_group.add_rule_to_group(group_name=group_name, 
239                                          protocol=kwds.get('protocol'), 
240                                          cidr_ip =kwds.get('cidr_ip'), 
241                                          icmp_type_code = kwds.get('icmp_type_code'))
242
243  
244     def reserve_instance(self, image_id, kernel_id, ramdisk_id, \
245                          instance_type, key_name, user_data, group_name):
246         conn  = self.driver.euca_shell.get_euca_connection()
247         logger.info('Reserving an instance: image: %s, kernel: ' \
248                     '%s, ramdisk: %s, type: %s, key: %s' % \
249                     (image_id, kernel_id, ramdisk_id,
250                     instance_type, key_name))
251         try:
252             reservation = conn.run_instances(image_id=image_id,
253                                              kernel_id=kernel_id,
254                                              ramdisk_id=ramdisk_id,
255                                              instance_type=instance_type,
256                                              key_name=key_name,
257                                              user_data = user_data,
258                                              security_groups=[group_name])
259                                              #placement=zone,
260                                              #min_count=min_count,
261                                              #max_count=max_count,           
262                                               
263         except Exception, err:
264             logger.log_exc(err)
265     
266                
267     def run_instances(self, slicename, rspec, keyname, pubkeys):
268         """
269         Create the security groups and instances. 
270         """
271         # the default image to use for instnaces that dont
272         # explicitly request an image.
273         # Just choose the first available image for now.
274         image_manager = ImageManager(self.driver)
275         available_images = image_manager.get_available_disk_images()
276         default_image_id = None
277         default_aki_id  = None
278         default_ari_id = None
279         default_image = available_images[0]
280         default_image_id = ec2_id(default_image.id, default_image.container_format)  
281         default_aki_id = ec2_id(default_image.kernel_id, 'aki')  
282         default_ari_id = ec2_id(default_image.ramdisk_id, 'ari')
283
284         # get requested slivers
285         rspec = RSpec(rspec)
286         user_data = "\n".join(pubkeys)
287         requested_instances = defaultdict(list)
288         # iterate over clouds/zones/nodes
289         for node in rspec.version.get_nodes_with_slivers():
290             instance_types = node.get('slivers', [])
291             if isinstance(instance_types, list):
292                 # iterate over sliver/instance types
293                 for instance_type in instance_types:
294                     fw_rules = instance_type.get('fw_rules', [])
295                     group_name = self.create_security_group(slicename, fw_rules)
296                     ami_id = default_image_id
297                     aki_id = default_aki_id
298                     ari_id = default_ari_id
299                     req_image = instance_type.get('disk_image')
300                     if req_image and isinstance(req_image, list):
301                         req_image_name = req_image[0]['name']
302                         disk_image = image_manager.get_disk_image(name=req_image_name)
303                         if disk_image:
304                             ami_id = ec2_id(disk_image.id, disk_image.container_format)
305                             aki_id = ec2_id(disk_image.kernel_id, 'aki')
306                             ari_id = ec2_id(disk_image.ramdisk_id, 'ari')
307                     # start the instance
308                     self.reserve_instance(image_id=ami_id, 
309                                           kernel_id=aki_id, 
310                                           ramdisk_id=ari_id, 
311                                           instance_type=instance_type['name'], 
312                                           key_name=keyname, 
313                                           user_data=user_data, 
314                                           group_name=group_name)
315
316
317     def delete_instances(self, project_name):
318         instances = self.driver.shell.db.instance_get_all_by_project(project_name)
319         security_group_manager = SecurityGroup(self.driver)
320         for instance in instances:
321             # deleate this instance's security groups
322             for security_group in instance.security_groups:
323                 # dont delete the default security group
324                 if security_group.name != 'default': 
325                     security_group_manager.delete_security_group(security_group.name)
326             # destroy instance
327             self.driver.shell.db.instance_destroy(instance.id)
328         return 1
329
330     def stop_instances(self, project_name):
331         instances = self.driver.shell.db.instance_get_all_by_project(project_name)
332         for instance in instances:
333             self.driver.shell.db.instance_stop(instance.id)
334         return 1
335
336     def update_instances(self, project_name):
337         pass