Implement create_slice
[sfa.git] / sfa / managers / aggregate_manager_eucalyptus.py
1 from __future__ import with_statement 
2 from sfa.util.faults import *
3 from sfa.util.namespace import *
4 from sfa.util.rspec import RSpec
5 from sfa.server.registry import Registries
6 from sfa.plc.nodes import *
7
8 import boto
9 from boto.ec2.regioninfo import RegionInfo
10 from boto.exception import EC2ResponseError
11 from ConfigParser import ConfigParser
12 from xmlbuilder import XMLBuilder
13 from xml.etree import ElementTree as ET
14 from sqlobject import *
15
16 import sys
17 import os
18
19 ##
20 # The data structure used to represent a cloud.
21 # It contains the cloud name, its ip address, image information,
22 # key pairs, and clusters information.
23 #
24 cloud = {}
25
26 ##
27 # A representation of an Eucalyptus instance. This is a support class
28 # for instance <-> slice mapping.
29 #
30 class EucaInstance(SQLObject):
31     instance_id = StringCol(unique=True, default=None)
32     kernel_id   = StringCol()
33     image_id    = StringCol()
34     ramdisk_id  = StringCol()
35     inst_type   = StringCol()
36     key_pair    = StringCol()
37     slice = ForeignKey('Slice')
38
39     ##
40     # Contacts Eucalyptus and tries to reserve this instance.
41     # 
42     # @param botoConn A connection to Eucalyptus.
43     #
44     def reserveInstance(self, botoConn):
45         print >>sys.stderr, 'Reserving an instance. image: %s, kernel: %s, ramdisk: %s, type: %s, key: %s' % \
46                             (self.image_id, self.kernel_id, self.ramdisk_id, self.inst_type, self.key_pair)
47
48         try:
49             reservation = botoConn.run_instances(self.image_id,
50                                                  kernel_id = self.kernel_id,
51                                                  ramdisk_id = self.ramdisk_id,
52                                                  instance_type = self.inst_type,
53                                                  key_name  = self.key_pair)
54             for instance in reservation.instances:
55                 self.instance_id = instance.id
56
57         # If there is an error, destroy itself.
58         except EC2ResponseError, ec2RespErr:
59             errTree = ET.fromstring(ec2RespErr.body)
60             msg = errTree.find('.//Message')
61             print >>sys.stderr, msg.text
62             self.destroySelf()
63
64 ##
65 # A representation of a PlanetLab slice. This is a support class
66 # for instance <-> slice mapping.
67 #
68 class Slice(SQLObject):
69     slice_hrn = StringCol()
70     #slice_index = DatabaseIndex('slice_hrn')
71     instances = MultipleJoin('EucaInstance')
72
73 ##
74 # Initialize the aggregate manager by reading a configuration file.
75 #
76 def init_server():
77     configParser = ConfigParser()
78     configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
79     if len(configParser.sections()) < 1:
80         print >>sys.stderr, 'No cloud defined in the config file'
81         raise 'Cannot find cloud definition in configuration file.'
82
83     # Only read the first section.
84     cloudSec = configParser.sections()[0]
85     cloud['name'] = cloudSec
86     cloud['access_key'] = configParser.get(cloudSec, 'access_key')
87     cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
88     cloud['cloud_url']  = configParser.get(cloudSec, 'cloud_url')
89     cloudURL = cloud['cloud_url']
90     if cloudURL.find('https://') >= 0:
91         cloudURL = cloudURL.replace('https://', '')
92     elif cloudURL.find('http://') >= 0:
93         cloudURL = cloudURL.replace('http://', '')
94     (cloud['ip'], parts) = cloudURL.split(':')
95
96     # Initialize sqlite3 database.
97     dbPath = '/etc/sfa/db'
98     dbName = 'euca_aggregate.db'
99
100     if not os.path.isdir(dbPath):
101         print >>sys.stderr, '%s not found. Creating directory ...' % dbPath
102         os.mkdir(dbPath)
103
104     conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
105     sqlhub.processConnection = conn
106     Slice.createTable(ifNotExists=True)
107     EucaInstance.createTable(ifNotExists=True)
108
109 ##
110 # Creates a connection to Eucalytpus. This function is inspired by 
111 # the make_connection() in Euca2ools.
112 #
113 # @return A connection object or None
114 #
115 def getEucaConnection():
116     global cloud
117     accessKey = cloud['access_key']
118     secretKey = cloud['secret_key']
119     eucaURL   = cloud['cloud_url']
120     useSSL    = False
121     srvPath   = '/'
122     eucaPort  = 8773
123
124     if not accessKey or not secretKey or not eucaURL:
125         print >>sys.stderr, 'Please set ALL of the required environment ' \
126                             'variables by sourcing the eucarc file.'
127         return None
128     
129     # Split the url into parts
130     if eucaURL.find('https://') >= 0:
131         useSSL  = True
132         eucaURL = eucaURL.replace('https://', '')
133     elif eucaURL.find('http://') >= 0:
134         useSSL  = False
135         eucaURL = eucaURL.replace('http://', '')
136     (eucaHost, parts) = eucaURL.split(':')
137     if len(parts) > 1:
138         parts = parts.split('/')
139         eucaPort = int(parts[0])
140         parts = parts[1:]
141         srvPath = '/'.join(parts)
142
143     return boto.connect_ec2(aws_access_key_id=accessKey,
144                             aws_secret_access_key=secretKey,
145                             is_secure=useSSL,
146                             region=RegionInfo(None, 'eucalyptus', eucaHost), 
147                             port=eucaPort,
148                             path=srvPath)
149
150 ##
151 # A class that builds the RSpec for Eucalyptus.
152 #
153 class EucaRSpecBuilder(object):
154     ##
155     # Initizes a RSpec builder
156     #
157     # @param cloud A dictionary containing data about a 
158     #              cloud (ex. clusters, ip)
159     def __init__(self, cloud):
160         self.eucaRSpec = XMLBuilder()
161         self.cloudInfo = cloud
162
163     ##
164     # Creates the ClusterSpec stanza.
165     #
166     # @param clusters Clusters information.
167     #
168     def __clustersXML(self, clusters):
169         xml = self.eucaRSpec
170         for cluster in clusters:
171             instances = cluster['instances']
172             with xml.ClusterSpec(id=cluster['name'], ip=cluster['ip']):
173                 for inst in instances:
174                     with xml.Node(instanceType=inst[0]):
175                         with xml.FreeSlot:
176                             xml << str(inst[1])
177                         with xml.MaxAllow:
178                             xml << str(inst[2])
179                         with xml.NumCore:
180                             xml << str(inst[3])
181                         with xml.Mem:
182                             xml << str(inst[4])
183                         with xml.DiskSpace(unit='GB'):
184                             xml << str(inst[5])
185
186     ##
187     # Creates the Images stanza.
188     #
189     # @param images A list of images in Eucalyptus.
190     #
191     def __imagesXML(self, images):
192         xml = self.eucaRSpec
193         with xml.Images:
194             for image in images:
195                 with xml.Image(id=image.id):
196                     with xml.Type:
197                         xml << image.type
198                     with xml.Arch:
199                         xml << image.architecture
200                     with xml.State:
201                         xml << image.state
202                     with xml.location:
203                         xml << image.location
204
205     ##
206     # Creates the KeyPairs stanza.
207     #
208     # @param keypairs A list of key pairs in Eucalyptus.
209     #
210     def __keyPairsXML(self, keypairs):
211         xml = self.eucaRSpec
212         with xml.KeyPairs:
213             for key in keypairs:
214                 with xml.Key:
215                     xml << key.name
216
217     ##
218     # Generates the RSpec.
219     #
220     def toXML(self):
221         if not self.cloudInfo:
222             print >>sys.stderr, 'No cloud information'
223             return ''
224
225         xml = self.eucaRSpec
226         cloud = self.cloudInfo
227         with xml.RSpec(name='eucalyptus'):
228             with xml.Capacity:
229                 with xml.CloudSpec(id=cloud['name'], ip=cloud['ip']):
230                     self.__keyPairsXML(cloud['keypairs'])
231                     self.__imagesXML(cloud['images'])
232                     self.__clustersXML(cloud['clusters'])
233             with xml.Request:
234                 with xml.CloudSpec(id=cloud['name'], ip=cloud['ip']):
235                     with xml.Credential(type='X509'):
236                         xml << 'cred'
237                     with xml.Node(instanceType='m1.small', number='1'):
238                         with xml.Kernel:
239                             xml << 'eki-F26610C6'
240                         with xml.Ramdisk:
241                             xml << ''
242                         with xml.DiskImage:
243                             xml << 'emi-88760F45'
244                         with xml.Key:
245                             xml << 'cortex'
246         return str(xml)
247
248 ##
249 # A parser to parse the output of availability-zones.
250 #
251 # Note: Only one cluster is supported. If more than one, this will
252 #       not work.
253 #
254 class ZoneResultParser(object):
255     def __init__(self, zones):
256         self.zones = zones
257
258     def parse(self):
259         if len(self.zones) < 3:
260             return
261         clusterList = []
262         cluster = {} 
263         instList = []
264
265         cluster['name'] = self.zones[0].name
266         cluster['ip']   = self.zones[0].state
267
268         for i in range(2, len(self.zones)):
269             currZone = self.zones[i]
270             instType = currZone.name.split()[1]
271
272             stateString = currZone.state.split('/')
273             rscString   = stateString[1].split()
274
275             instFree      = int(stateString[0])
276             instMax       = int(rscString[0])
277             instNumCpu    = int(rscString[1])
278             instRam       = int(rscString[2])
279             instDiskSpace = int(rscString[3])
280
281             instTuple = (instType, instFree, instMax, instNumCpu, instRam, instDiskSpace)
282             instList.append(instTuple)
283         cluster['instances'] = instList
284         clusterList.append(cluster)
285
286         return clusterList
287
288 def get_rspec(api, xrn, origin_hrn):
289     global cloud
290     hrn = urn_to_hrn(xrn)[0]
291     conn = getEucaConnection()
292
293     if not conn:
294         print >>sys.stderr, 'Error: Cannot create a connection to Eucalyptus'
295         return 'Cannot create a connection to Eucalyptus'
296
297     try:
298         # Zones
299         zones = conn.get_all_zones(['verbose'])
300         p = ZoneResultParser(zones)
301         clusters = p.parse()
302         cloud['clusters'] = clusters
303         
304         # Images
305         images = conn.get_all_images()
306         cloud['images'] = images
307
308         # Key Pairs
309         keyPairs = conn.get_all_key_pairs()
310         cloud['keypairs'] = keyPairs
311     except EC2ResponseError, ec2RespErr:
312         errTree = ET.fromstring(ec2RespErr.body)
313         errMsgE = errTree.find('.//Message')
314         print >>sys.stderr, errMsgE.text
315
316     rspec = EucaRSpecBuilder(cloud).toXML()
317
318     return rspec
319
320 """
321 Hook called via 'sfi.py create'
322 """
323 def create_slice(api, xrn, xml):
324     global cloud
325     hrn = urn_to_hrn(xrn)[0]
326
327     conn = getEucaConnection()
328     if not conn:
329         print >>sys.stderr, 'Error: Cannot create a connection to Eucalyptus'
330         return False
331
332     # Get the slice from db or create one.
333     # XXX: For testing purposes, I'll just create the slice.
334     #s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
335     #if s is None:
336     s = Slice(slice_hrn = hrn)
337
338     # Process the RSpec
339     rspecXML = RSpec(xml)
340     rspecDict = rspecXML.toDict()
341     request = rspecDict['RSpec']['Request']
342     for cloudSpec in request:
343         cloudSpec = cloudSpec['CloudSpec']
344         for cloudReqInfo in cloudSpec:
345             for nodeReq in cloudReqInfo['Node']:
346                 instKernel  = nodeReq['Kernel'][0]
347                 instDiskImg = nodeReq['DiskImage'][0]
348                 instRamDisk = nodeReq['Ramdisk'][0]
349                 instKey     = nodeReq['Key'][0]
350                 instType    = nodeReq['instanceType']
351                 numInst     = int(nodeReq['number'])
352
353                 # Ramdisk is optional.
354                 if isinstance(instRamDisk, dict):
355                     instRamDisk = None
356
357                 # Create the instances
358                 for i in range(0, numInst):
359                     eucaInst = EucaInstance(slice = s, 
360                                             kernel_id = instKernel,
361                                             image_id = instDiskImg,
362                                             ramdisk_id = instRamDisk,
363                                             key_pair = instKey,
364                                             inst_type = instType)
365                     eucaInst.reserveInstance(conn)
366
367     return True
368
369 def main():
370     init_server()
371     r = RSpec()
372     r.parseFile(sys.argv[1])
373     rspec = r.toDict()
374     create_slice(None,'planetcloud.pc.test',rspec)
375     #rspec = get_rspec('euca', 'hrn:euca', 'oring_hrn')
376     #print rspec
377
378 if __name__ == "__main__":
379     main()
380