added pubkeys_to_user_data()
[sfa.git] / sfa / openstack / osaggregate.py
1
2 import os
3 import socket
4 import base64
5 import string
6 import random    
7 from collections import defaultdict
8 from nova.exception import ImageNotFound
9 from nova.api.ec2.cloud import CloudController
10 from sfa.util.faults import SfaAPIError
11 from sfa.rspecs.rspec import RSpec
12 from sfa.rspecs.elements.hardware_type import HardwareType
13 from sfa.rspecs.elements.node import Node
14 from sfa.rspecs.elements.sliver import Sliver
15 from sfa.rspecs.elements.login import Login
16 from sfa.rspecs.elements.disk_image import DiskImage
17 from sfa.rspecs.elements.services import Services
18 from sfa.rspecs.elements.interface import Interface
19 from sfa.util.xrn import Xrn
20 from sfa.planetlab.plxrn import PlXrn 
21 from sfa.openstack.osxrn import OSXrn, hrn_to_os_slicename
22 from sfa.rspecs.version_manager import VersionManager
23 from sfa.openstack.image import ImageManager
24 from sfa.openstack.security_group import SecurityGroup
25 from sfa.util.sfalogging import logger
26
27 def pubkeys_to_user_data(pubkeys):
28     user_data = "#!/bin/bash\n\n"
29     for pubkey in pubkeys:
30         pubkey = pubkey.replace('\n', '')
31         user_data += " echo %s >> /root/.ssh/authorized_keys" % pubkey
32         user_data += "echo >> /root/.ssh/authorized_keys"
33     return user_data
34
35 def instance_to_sliver(instance, slice_xrn=None):
36     # should include?
37     # * instance.image_ref
38     # * instance.kernel_id
39     # * instance.ramdisk_id
40     import nova.db.sqlalchemy.models
41     name=None
42     type=None
43     sliver_id = None
44     if isinstance(instance, dict):
45         # this is an isntance type dict
46         name = instance['name']
47         type = instance['name']
48     elif isinstance(instance, nova.db.sqlalchemy.models.Instance):
49         # this is an object that describes a running instance
50         name = instance.display_name
51         type = instance.instance_type.name
52     else:
53         raise SfaAPIError("instnace must be an instance_type dict or" + \
54                            " a nova.db.sqlalchemy.models.Instance object")
55     if slice_xrn:
56         xrn = Xrn(slice_xrn, 'slice')
57         sliver_id = xrn.get_sliver_id(instance.project_id, instance.hostname, instance.id)
58
59     sliver = Sliver({'slice_id': sliver_id,
60                      'name': name,
61                      'type':  type,
62                      'tags': []})
63     return sliver
64     
65
66 def ec2_id(id=None, type=None):
67     ec2_id = None
68     if type == 'ovf':
69         type = 'ami'   
70     if id and type:
71         ec2_id = CloudController.image_ec2_id(id, type)        
72     return ec2_id
73
74
75 class OSAggregate:
76
77     def __init__(self, driver):
78         self.driver = driver
79
80     def get_rspec(self, slice_xrn=None, version=None, options={}):
81         version_manager = VersionManager()
82         version = version_manager.get_version(version)
83         if not slice_xrn:
84             rspec_version = version_manager._get_version(version.type, version.version, 'ad')
85             nodes = self.get_aggregate_nodes()
86         else:
87             rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
88             nodes = self.get_slice_nodes(slice_xrn)
89         rspec = RSpec(version=rspec_version, user_options=options)
90         rspec.version.add_nodes(nodes)
91         return rspec.toxml()
92
93     def get_availability_zones(self):
94         try:
95             # pre essex releases 
96             zones = self.driver.shell.db.zone_get_all()
97         except:
98             # essex release
99             zones = self.driver.shell.db.dnsdomain_list()
100
101         if not zones:
102             zones = ['cloud']
103         else:
104             zones = [zone.name for zone in zones]
105         return zones
106
107     def get_slice_nodes(self, slice_xrn):
108         image_manager = ImageManager(self.driver)
109
110         zones = self.get_availability_zones()
111         name = hrn_to_os_slicename(slice_xrn)
112         instances = self.driver.shell.db.instance_get_all_by_project(name)
113         rspec_nodes = []
114         for instance in instances:
115             rspec_node = Node()
116             interfaces = []
117             for fixed_ip in instance.fixed_ips:
118                 if_xrn = PlXrn(auth=self.driver.hrn, 
119                                interface='node%s:eth0' % (instance.hostname)) 
120                 interface = Interface({'component_id': if_xrn.urn})
121                 interface['ips'] =  [{'address': fixed_ip['address'],
122                                      'netmask': fixed_ip['network'].netmask,
123                                      'type': 'ipv4'}]
124                 interface['floating_ips'] = []
125                 for floating_ip in fixed_ip.floating_ips:
126                     interface['floating_ips'].append(floating_ip.address)
127                 interfaces.append(interface)
128             if instance.availability_zone:
129                 node_xrn = OSXrn(instance.availability_zone, 'node')
130             else:
131                 node_xrn = OSXrn('cloud', 'node')
132
133             rspec_node['component_id'] = node_xrn.urn
134             rspec_node['component_name'] = node_xrn.name
135             rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()   
136             sliver = instance_to_sliver(instance)
137             disk_image = image_manager.get_disk_image(instance.image_ref)
138             sliver['disk_image'] = [disk_image.to_rspec_object()]
139             rspec_node['slivers'] = [sliver]
140             rspec_node['interfaces'] = interfaces
141             # slivers always provide the ssh service
142             rspec_node['services'] = []
143             for interface in interfaces:
144                 if 'floating_ips' in interface:
145                     for hostname in interface['floating_ips']:
146                         login = Login({'authentication': 'ssh-keys', 
147                                        'hostname': hostname, 
148                                        'port':'22', 'username': 'root'})
149                         service = Services({'login': login})
150                         rspec_node['services'].append(service)
151             rspec_nodes.append(rspec_node)
152         return rspec_nodes
153
154     def get_aggregate_nodes(self):
155         zones = self.get_availability_zones()
156         # available sliver/instance/vm types
157         instances = self.driver.shell.db.instance_type_get_all()
158         if isinstance(instances, dict):
159             instances = instances.values()
160         # available images
161         image_manager = ImageManager(self.driver)
162         disk_images = image_manager.get_available_disk_images()
163         disk_image_objects = [image.to_rspec_object() \
164                                for image in disk_images]  
165         rspec_nodes = []
166         for zone in zones:
167             rspec_node = Node()
168             xrn = OSXrn(zone, 'node')
169             rspec_node['component_id'] = xrn.urn
170             rspec_node['component_name'] = xrn.name
171             rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
172             rspec_node['exclusive'] = 'false'
173             rspec_node['hardware_types'] = [HardwareType({'name': 'plos-pc'}),
174                                                 HardwareType({'name': 'pc'})]
175             slivers = []
176             for instance in instances:
177                 sliver = instance_to_sliver(instance)
178                 sliver['disk_image'] = disk_image_objects
179                 slivers.append(sliver)
180         
181             rspec_node['slivers'] = slivers
182             rspec_nodes.append(rspec_node) 
183
184         return rspec_nodes 
185
186
187     def create_project(self, slicename, users, options={}):
188         """
189         Create the slice if it doesn't alredy exist. Create user
190         accounts that don't already exist   
191         """
192         from nova.exception import ProjectNotFound, UserNotFound
193         for user in users:
194             username = Xrn(user['urn']).get_leaf()
195             try:
196                 self.driver.shell.auth_manager.get_user(username)
197             except UserNotFound:
198                 self.driver.shell.auth_manager.create_user(username)
199             self.verify_user_keys(username, user['keys'], options)
200
201         try:
202             slice = self.driver.shell.auth_manager.get_project(slicename)
203         except ProjectNotFound:
204             # assume that the first user is the project manager
205             proj_manager = Xrn(users[0]['urn']).get_leaf()
206             self.driver.shell.auth_manager.create_project(slicename, proj_manager) 
207
208     def verify_user_keys(self, username, keys, options={}):
209         """
210         Add requested keys.
211         """
212         append = options.get('append', True)    
213         existing_keys = self.driver.shell.db.key_pair_get_all_by_user(username)
214         existing_pub_keys = [key.public_key for key in existing_keys]
215         removed_pub_keys = set(existing_pub_keys).difference(keys)
216         added_pub_keys = set(keys).difference(existing_pub_keys)
217         pubkeys = []
218         # add new keys
219         for public_key in added_pub_keys:
220             key = {}
221             key['user_id'] = username
222             key['name'] =  username
223             key['public_key'] = public_key
224             self.driver.shell.db.key_pair_create(key)
225
226         # remove old keys
227         if not append:
228             for key in existing_keys:
229                 if key.public_key in removed_pub_keys:
230                     self.driver.shell.db.key_pair_destroy(username, key.name)
231
232
233     def create_security_group(self, slicename, fw_rules=[]):
234         # use default group by default
235         group_name = 'default' 
236         if isinstance(fw_rules, list) and fw_rules:
237             # Each sliver get's its own security group.
238             # Keep security group names unique by appending some random
239             # characters on end.
240             random_name = "".join([random.choice(string.letters+string.digits)
241                                            for i in xrange(6)])
242             group_name = slicename + random_name 
243             security_group = SecurityGroup(self.driver)
244             security_group.create_security_group(group_name)
245             for rule in fw_rules:
246                 security_group.add_rule_to_group(group_name, 
247                                              protocol = rule.get('protocol'), 
248                                              cidr_ip = rule.get('cidr_ip'), 
249                                              port_range = rule.get('port_range'), 
250                                              icmp_type_code = rule.get('icmp_type_code'))
251         return group_name
252
253     def add_rule_to_security_group(self, group_name, **kwds):
254         security_group = SecurityGroup(self.driver)
255         security_group.add_rule_to_group(group_name=group_name, 
256                                          protocol=kwds.get('protocol'), 
257                                          cidr_ip =kwds.get('cidr_ip'), 
258                                          icmp_type_code = kwds.get('icmp_type_code'))
259
260  
261     def reserve_instance(self, image_id, kernel_id, ramdisk_id, \
262                          instance_type, key_name, user_data, group_name):
263         conn  = self.driver.euca_shell.get_euca_connection()
264         logger.info('Reserving an instance: image: %s, kernel: ' \
265                     '%s, ramdisk: %s, type: %s, key: %s' % \
266                     (image_id, kernel_id, ramdisk_id,
267                     instance_type, key_name))
268         try:
269             reservation = conn.run_instances(image_id=image_id,
270                                              kernel_id=kernel_id,
271                                              ramdisk_id=ramdisk_id,
272                                              instance_type=instance_type,
273                                              key_name=key_name,
274                                              user_data = user_data,
275                                              security_groups=[group_name])
276                                              #placement=zone,
277                                              #min_count=min_count,
278                                              #max_count=max_count,           
279                                               
280         except Exception, err:
281             logger.log_exc(err)
282     
283                
284     def run_instances(self, slicename, rspec, keyname, pubkeys):
285         """
286         Create the security groups and instances. 
287         """
288         # the default image to use for instnaces that dont
289         # explicitly request an image.
290         # Just choose the first available image for now.
291         image_manager = ImageManager(self.driver)
292         available_images = image_manager.get_available_disk_images()
293         default_image_id = None
294         default_aki_id  = None
295         default_ari_id = None
296         default_image = available_images[0]
297         default_image_id = ec2_id(default_image.id, default_image.container_format)  
298         default_aki_id = ec2_id(default_image.kernel_id, 'aki')  
299         default_ari_id = ec2_id(default_image.ramdisk_id, 'ari')
300
301         # get requested slivers
302         rspec = RSpec(rspec)
303         user_data = pubkeys_to_user_data(pubkeys)
304         requested_instances = defaultdict(list)
305         # iterate over clouds/zones/nodes
306         for node in rspec.version.get_nodes_with_slivers():
307             instance_types = node.get('slivers', [])
308             if isinstance(instance_types, list):
309                 # iterate over sliver/instance types
310                 for instance_type in instance_types:
311                     fw_rules = instance_type.get('fw_rules', [])
312                     group_name = self.create_security_group(slicename, fw_rules)
313                     ami_id = default_image_id
314                     aki_id = default_aki_id
315                     ari_id = default_ari_id
316                     req_image = instance_type.get('disk_image')
317                     if req_image and isinstance(req_image, list):
318                         req_image_name = req_image[0]['name']
319                         disk_image = image_manager.get_disk_image(name=req_image_name)
320                         if disk_image:
321                             ami_id = ec2_id(disk_image.id, disk_image.container_format)
322                             aki_id = ec2_id(disk_image.kernel_id, 'aki')
323                             ari_id = ec2_id(disk_image.ramdisk_id, 'ari')
324                     # start the instance
325                     self.reserve_instance(image_id=ami_id, 
326                                           kernel_id=aki_id, 
327                                           ramdisk_id=ari_id, 
328                                           instance_type=instance_type['name'], 
329                                           key_name=keyname, 
330                                           user_data=user_data, 
331                                           group_name=group_name)
332
333
334     def delete_instances(self, project_name):
335         instances = self.driver.shell.db.instance_get_all_by_project(project_name)
336         security_group_manager = SecurityGroup(self.driver)
337         for instance in instances:
338             # deleate this instance's security groups
339             for security_group in instance.security_groups:
340                 # dont delete the default security group
341                 if security_group.name != 'default': 
342                     security_group_manager.delete_security_group(security_group.name)
343             # destroy instance
344             self.driver.shell.db.instance_destroy(instance.id)
345         return 1
346
347     def stop_instances(self, project_name):
348         instances = self.driver.shell.db.instance_get_all_by_project(project_name)
349         for instance in instances:
350             self.driver.shell.db.instance_stop(instance.id)
351         return 1
352
353     def update_instances(self, project_name):
354         pass