Merge branch 'master' of ssh://git.onelab.eu/git/sfa
[sfa.git] / sfa / openstack / osaggregate.py
1
2 import os
3 import socket
4 import base64
5 import string
6 import random    
7 from collections import defaultdict
8 from nova.exception import ImageNotFound
9 from nova.api.ec2.cloud import CloudController
10 from sfa.util.faults import SfaAPIError
11 from sfa.rspecs.rspec import RSpec
12 from sfa.rspecs.elements.hardware_type import HardwareType
13 from sfa.rspecs.elements.node import Node
14 from sfa.rspecs.elements.sliver import Sliver
15 from sfa.rspecs.elements.login import Login
16 from sfa.rspecs.elements.disk_image import DiskImage
17 from sfa.rspecs.elements.services import Services
18 from sfa.rspecs.elements.interface import Interface
19 from sfa.util.xrn import Xrn
20 from sfa.planetlab.plxrn import PlXrn 
21 from sfa.openstack.osxrn import OSXrn, hrn_to_os_slicename
22 from sfa.rspecs.version_manager import VersionManager
23 from sfa.openstack.image import ImageManager
24 from sfa.openstack.security_group import SecurityGroup
25 from sfa.util.sfalogging import logger
26
27 def instance_to_sliver(instance, slice_xrn=None):
28     # should include?
29     # * instance.image_ref
30     # * instance.kernel_id
31     # * instance.ramdisk_id
32     import nova.db.sqlalchemy.models
33     name=None
34     type=None
35     sliver_id = None
36     if isinstance(instance, dict):
37         # this is an isntance type dict
38         name = instance['name']
39         type = instance['name']
40     elif isinstance(instance, nova.db.sqlalchemy.models.Instance):
41         # this is an object that describes a running instance
42         name = instance.display_name
43         type = instance.instance_type.name
44     else:
45         raise SfaAPIError("instnace must be an instance_type dict or" + \
46                            " a nova.db.sqlalchemy.models.Instance object")
47     if slice_xrn:
48         xrn = Xrn(slice_xrn, 'slice')
49         sliver_id = xrn.get_sliver_id(instance.project_id, instance.hostname, instance.id)
50
51     sliver = Sliver({'slice_id': sliver_id,
52                      'name': name,
53                      'type':  type,
54                      'tags': []})
55     return sliver
56     
57
58 def ec2_id(id=None, type=None):
59     ec2_id = None
60     if type == 'ovf':
61         type = 'ami'   
62     if id and type:
63         ec2_id = CloudController.image_ec2_id(id, type)        
64     return ec2_id
65
66
67 class OSAggregate:
68
69     def __init__(self, driver):
70         self.driver = driver
71
72     def get_rspec(self, slice_xrn=None, version=None, options={}):
73         version_manager = VersionManager()
74         version = version_manager.get_version(version)
75         if not slice_xrn:
76             rspec_version = version_manager._get_version(version.type, version.version, 'ad')
77             nodes = self.get_aggregate_nodes()
78         else:
79             rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
80             nodes = self.get_slice_nodes(slice_xrn)
81         rspec = RSpec(version=rspec_version, user_options=options)
82         rspec.version.add_nodes(nodes)
83         return rspec.toxml()
84
85     def get_availability_zones(self):
86         try:
87             # pre essex releases 
88             zones = self.driver.shell.db.zone_get_all()
89         except:
90             # essex release
91             zones = self.driver.shell.db.dnsdomain_list()
92
93         if not zones:
94             zones = ['cloud']
95         else:
96             zones = [zone.name for zone in zones]
97         return zones
98
99     def get_slice_nodes(self, slice_xrn):
100         image_manager = ImageManager(self.driver)
101
102         zones = self.get_availability_zones()
103         name = hrn_to_os_slicename(slice_xrn)
104         instances = self.driver.shell.db.instance_get_all_by_project(name)
105         rspec_nodes = []
106         for instance in instances:
107             rspec_node = Node()
108             interfaces = []
109             for fixed_ip in instance.fixed_ips:
110                 if_xrn = PlXrn(auth=self.driver.hrn, 
111                                interface='node%s:eth0' % (instance.hostname)) 
112                 interface = Interface({'component_id': if_xrn.urn})
113                 interface['ips'] =  [{'address': fixed_ip['address'],
114                                      'netmask': fixed_ip['network'].netmask,
115                                      'type': 'ipv4'}]
116                 interfaces.append(interface)
117             if instance.availability_zone:
118                 node_xrn = OSXrn(instance.availability_zone, 'node')
119             else:
120                 node_xrn = OSXrn('cloud', 'node')
121
122             rspec_node['component_id'] = node_xrn.urn
123             rspec_node['component_name'] = node_xrn.name
124             rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()   
125             sliver = instance_to_sliver(instance)
126             disk_image = image_manager.get_disk_image(instance.image_ref)
127             sliver['disk_image'] = [disk_image.to_rspec_object()]
128             rspec_node['slivers'] = [sliver]
129             rspec_node['interfaces'] = interfaces
130             # slivers always provide the ssh service
131             hostname = None
132             for interface in interfaces:
133                 if 'ips' in interface and interface['ips'] and \
134                 isinstance(interface['ips'], list):
135                     if interface['ips'][0].get('address'):
136                         hostname = interface['ips'][0].get('address')
137                         break 
138             login = Login({'authentication': 'ssh-keys', 
139                            'hostname': hostname, 
140                            'port':'22', 'username': 'root'})
141             service = Services({'login': login})
142             rspec_node['services'] = [service] 
143             rspec_nodes.append(rspec_node)
144         return rspec_nodes
145
146     def get_aggregate_nodes(self):
147         zones = self.get_availability_zones()
148         # available sliver/instance/vm types
149         instances = self.driver.shell.db.instance_type_get_all().values()
150         # available images
151         image_manager = ImageManager(self.driver)
152         disk_images = image_manager.get_available_disk_images()
153         disk_image_objects = [image.to_rspec_object() \
154                                for image in disk_images]  
155         rspec_nodes = []
156         for zone in zones:
157             rspec_node = Node()
158             xrn = OSXrn(zone, 'node')
159             rspec_node['component_id'] = xrn.urn
160             rspec_node['component_name'] = xrn.name
161             rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
162             rspec_node['exclusive'] = 'false'
163             rspec_node['hardware_types'] = [HardwareType({'name': 'plos-pc'}),
164                                                 HardwareType({'name': 'pc'})]
165             slivers = []
166             for instance in instances:
167                 sliver = instance_to_sliver(instance)
168                 sliver['disk_image'] = disk_image_objects
169                 slivers.append(sliver)
170         
171             rspec_node['slivers'] = slivers
172             rspec_nodes.append(rspec_node) 
173
174         return rspec_nodes 
175
176
177     def create_project(self, slicename, users, options={}):
178         """
179         Create the slice if it doesn't alredy exist. Create user
180         accounts that don't already exist   
181         """
182         from nova.exception import ProjectNotFound, UserNotFound
183         for user in users:
184             username = Xrn(user['urn']).get_leaf()
185             try:
186                 self.driver.shell.auth_manager.get_user(username)
187             except UserNotFound:
188                 self.driver.shell.auth_manager.create_user(username)
189             self.verify_user_keys(username, user['keys'], options)
190
191         try:
192             slice = self.driver.shell.auth_manager.get_project(slicename)
193         except ProjectNotFound:
194             # assume that the first user is the project manager
195             proj_manager = Xrn(users[0]['urn']).get_leaf()
196             self.driver.shell.auth_manager.create_project(slicename, proj_manager) 
197
198     def verify_user_keys(self, username, keys, options={}):
199         """
200         Add requested keys.
201         """
202         append = options.get('append', True)    
203         existing_keys = self.driver.shell.db.key_pair_get_all_by_user(username)
204         existing_pub_keys = [key.public_key for key in existing_keys]
205         removed_pub_keys = set(existing_pub_keys).difference(keys)
206         added_pub_keys = set(keys).difference(existing_pub_keys)
207         pubkeys = []
208         # add new keys
209         for public_key in added_pub_keys:
210             key = {}
211             key['user_id'] = username
212             key['name'] =  username
213             key['public_key'] = public_key
214             self.driver.shell.db.key_pair_create(key)
215
216         # remove old keys
217         if not append:
218             for key in existing_keys:
219                 if key.public_key in removed_pub_keys:
220                     self.driver.shell.db.key_pair_destroy(username, key.name)
221
222
223     def create_security_group(self, slicename, fw_rules=[]):
224         # use default group by default
225         group_name = 'default' 
226         if isinstance(fw_rules, list) and fw_rules:
227             # Each sliver get's its own security group.
228             # Keep security group names unique by appending some random
229             # characters on end.
230             random_name = "".join([random.choice(string.letters+string.digits)
231                                            for i in xrange(6)])
232             group_name = slicename + random_name 
233             security_group = SecurityGroup(self.driver)
234             security_group.create_security_group(group_name)
235             for rule in fw_rules:
236                 security_group.add_rule_to_group(group_name, 
237                                              protocol = rule.get('protocol'), 
238                                              cidr_ip = rule.get('cidr_ip'), 
239                                              port_range = rule.get('port_range'), 
240                                              icmp_type_code = rule.get('icmp_type_code'))
241         return group_name
242
243     def add_rule_to_security_group(self, group_name, **kwds):
244         security_group = SecurityGroup(self.driver)
245         security_group.add_rule_to_group(group_name=group_name, 
246                                          protocol=kwds.get('protocol'), 
247                                          cidr_ip =kwds.get('cidr_ip'), 
248                                          icmp_type_code = kwds.get('icmp_type_code'))
249
250  
251     def reserve_instance(self, image_id, kernel_id, ramdisk_id, \
252                          instance_type, key_name, user_data, group_name):
253         conn  = self.driver.euca_shell.get_euca_connection()
254         logger.info('Reserving an instance: image: %s, kernel: ' \
255                     '%s, ramdisk: %s, type: %s, key: %s' % \
256                     (image_id, kernel_id, ramdisk_id,
257                     instance_type, key_name))
258         try:
259             reservation = conn.run_instances(image_id=image_id,
260                                              kernel_id=kernel_id,
261                                              ramdisk_id=ramdisk_id,
262                                              instance_type=instance_type,
263                                              key_name=key_name,
264                                              user_data = user_data,
265                                              security_groups=[group_name])
266                                              #placement=zone,
267                                              #min_count=min_count,
268                                              #max_count=max_count,           
269                                               
270         except Exception, err:
271             logger.log_exc(err)
272     
273                
274     def run_instances(self, slicename, rspec, keyname, pubkeys):
275         """
276         Create the security groups and instances. 
277         """
278         # the default image to use for instnaces that dont
279         # explicitly request an image.
280         # Just choose the first available image for now.
281         image_manager = ImageManager(self.driver)
282         available_images = image_manager.get_available_disk_images()
283         default_image_id = None
284         default_aki_id  = None
285         default_ari_id = None
286         default_image = available_images[0]
287         default_image_id = ec2_id(default_image.id, default_image.container_format)  
288         default_aki_id = ec2_id(default_image.kernel_id, 'aki')  
289         default_ari_id = ec2_id(default_image.ramdisk_id, 'ari')
290
291         # get requested slivers
292         rspec = RSpec(rspec)
293         user_data = "\n".join(pubkeys)
294         requested_instances = defaultdict(list)
295         # iterate over clouds/zones/nodes
296         for node in rspec.version.get_nodes_with_slivers():
297             instance_types = node.get('slivers', [])
298             if isinstance(instance_types, list):
299                 # iterate over sliver/instance types
300                 for instance_type in instance_types:
301                     fw_rules = instance_type.get('fw_rules', [])
302                     group_name = self.create_security_group(slicename, fw_rules)
303                     ami_id = default_image_id
304                     aki_id = default_aki_id
305                     ari_id = default_ari_id
306                     req_image = instance_type.get('disk_image')
307                     if req_image and isinstance(req_image, list):
308                         req_image_name = req_image[0]['name']
309                         disk_image = image_manager.get_disk_image(name=req_image_name)
310                         if disk_image:
311                             ami_id = ec2_id(disk_image.id, disk_image.container_format)
312                             aki_id = ec2_id(disk_image.kernel_id, 'aki')
313                             ari_id = ec2_id(disk_image.ramdisk_id, 'ari')
314                     # start the instance
315                     self.reserve_instance(image_id=ami_id, 
316                                           kernel_id=aki_id, 
317                                           ramdisk_id=ari_id, 
318                                           instance_type=instance_type['name'], 
319                                           key_name=keyname, 
320                                           user_data=user_data, 
321                                           group_name=group_name)
322
323
324     def delete_instances(self, project_name):
325         instances = self.driver.shell.db.instance_get_all_by_project(project_name)
326         security_group_manager = SecurityGroup(self.driver)
327         for instance in instances:
328             # deleate this instance's security groups
329             for security_group in instance.security_groups:
330                 # dont delete the default security group
331                 if security_group.name != 'default': 
332                     security_group_manager.delete_security_group(security_group.name)
333             # destroy instance
334             self.driver.shell.db.instance_destroy(instance.id)
335         return 1
336
337     def stop_instances(self, project_name):
338         instances = self.driver.shell.db.instance_get_all_by_project(project_name)
339         for instance in instances:
340             self.driver.shell.db.instance_stop(instance.id)
341         return 1
342
343     def update_instances(self, project_name):
344         pass