Compatibility changes
[sfa.git] / sfa / managers / aggregate_manager_eucalyptus.py
1 from __future__ import with_statement 
2
3 import sys
4 import os
5 import logging
6 import datetime
7
8 import boto
9 from boto.ec2.regioninfo import RegionInfo
10 from boto.exception import EC2ResponseError
11 from ConfigParser import ConfigParser
12 from xmlbuilder import XMLBuilder
13 from lxml import etree as ET
14 from sqlobject import *
15
16 from sfa.util.faults import *
17 from sfa.util.xrn import urn_to_hrn, Xrn
18 from sfa.util.rspec import RSpec
19 from sfa.server.registry import Registries
20 from sfa.trust.credential import Credential
21 from sfa.plc.api import SfaAPI
22 from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn
23 from sfa.util.callids import Callids
24 from sfa.util.sfalogging import logger
25 from sfa.rspecs.sfa_rspec import sfa_rspec_version
26 from sfa.util.version import version_core
27
28 from multiprocessing import Process
29 from time import sleep
30
31 ##
32 # The data structure used to represent a cloud.
33 # It contains the cloud name, its ip address, image information,
34 # key pairs, and clusters information.
35 #
36 cloud = {}
37
38 ##
39 # The location of the RelaxNG schema.
40 #
41 EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
42
43 api = SfaAPI()
44
45 ##
46 # Meta data of an instance.
47 #
48 class Meta(SQLObject):
49     instance   = SingleJoin('EucaInstance')
50     state      = StringCol(default = 'new')
51     pub_addr   = StringCol(default = None)
52     pri_addr   = StringCol(default = None)
53     start_time = DateTimeCol(default = None)
54
55 ##
56 # A representation of an Eucalyptus instance. This is a support class
57 # for instance <-> slice mapping.
58 #
59 class EucaInstance(SQLObject):
60     instance_id = StringCol(unique=True, default=None)
61     kernel_id   = StringCol()
62     image_id    = StringCol()
63     ramdisk_id  = StringCol()
64     inst_type   = StringCol()
65     key_pair    = StringCol()
66     slice       = ForeignKey('Slice')
67     meta        = ForeignKey('Meta')
68
69     ##
70     # Contacts Eucalyptus and tries to reserve this instance.
71     # 
72     # @param botoConn A connection to Eucalyptus.
73     # @param pubKeys A list of public keys for the instance.
74     #
75     def reserveInstance(self, botoConn, pubKeys):
76         logger = logging.getLogger('EucaAggregate')
77         logger.info('Reserving an instance: image: %s, kernel: ' \
78                     '%s, ramdisk: %s, type: %s, key: %s' % \
79                     (self.image_id, self.kernel_id, self.ramdisk_id,
80                     self.inst_type, self.key_pair))
81
82         # XXX The return statement is for testing. REMOVE in production
83         #return
84
85         try:
86             reservation = botoConn.run_instances(self.image_id,
87                                                  kernel_id = self.kernel_id,
88                                                  ramdisk_id = self.ramdisk_id,
89                                                  instance_type = self.inst_type,
90                                                  key_name  = self.key_pair,
91                                                  user_data = pubKeys)
92             for instance in reservation.instances:
93                 self.instance_id = instance.id
94
95         # If there is an error, destroy itself.
96         except EC2ResponseError, ec2RespErr:
97             errTree = ET.fromstring(ec2RespErr.body)
98             msg = errTree.find('.//Message')
99             logger.error(msg.text)
100             self.destroySelf()
101
102 ##
103 # A representation of a PlanetLab slice. This is a support class
104 # for instance <-> slice mapping.
105 #
106 class Slice(SQLObject):
107     slice_hrn = StringCol()
108     #slice_index = DatabaseIndex('slice_hrn')
109     instances = MultipleJoin('EucaInstance')
110
111 ##
112 # Initialize the aggregate manager by reading a configuration file.
113 #
114 def init_server():
115     logger = logging.getLogger('EucaAggregate')
116     fileHandler = logging.FileHandler('/var/log/euca.log')
117     fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
118     logger.addHandler(fileHandler)
119     fileHandler.setLevel(logging.DEBUG)
120     logger.setLevel(logging.DEBUG)
121
122     configParser = ConfigParser()
123     configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
124     if len(configParser.sections()) < 1:
125         logger.error('No cloud defined in the config file')
126         raise Exception('Cannot find cloud definition in configuration file.')
127
128     # Only read the first section.
129     cloudSec = configParser.sections()[0]
130     cloud['name'] = cloudSec
131     cloud['access_key'] = configParser.get(cloudSec, 'access_key')
132     cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
133     cloud['cloud_url']  = configParser.get(cloudSec, 'cloud_url')
134     cloudURL = cloud['cloud_url']
135     if cloudURL.find('https://') >= 0:
136         cloudURL = cloudURL.replace('https://', '')
137     elif cloudURL.find('http://') >= 0:
138         cloudURL = cloudURL.replace('http://', '')
139     (cloud['ip'], parts) = cloudURL.split(':')
140
141     # Create image bundles
142     images = getEucaConnection().get_all_images()
143     cloud['images'] = images
144     cloud['imageBundles'] = {}
145     for i in images:
146         if i.type != 'machine' or i.kernel_id is None: continue
147         name = os.path.dirname(i.location)
148         detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
149         cloud['imageBundles'][name] = detail
150
151     # Initialize sqlite3 database and tables.
152     dbPath = '/etc/sfa/db'
153     dbName = 'euca_aggregate.db'
154
155     if not os.path.isdir(dbPath):
156         logger.info('%s not found. Creating directory ...' % dbPath)
157         os.mkdir(dbPath)
158
159     conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
160     sqlhub.processConnection = conn
161     Slice.createTable(ifNotExists=True)
162     EucaInstance.createTable(ifNotExists=True)
163     Meta.createTable(ifNotExists=True)
164
165     # Start the update process to keep track of the meta data
166     # about Eucalyptus instance.
167     Process(target=updateMeta).start()
168
169     # Make sure the schema exists.
170     if not os.path.exists(EUCALYPTUS_RSPEC_SCHEMA):
171         err = 'Cannot location schema at %s' % EUCALYPTUS_RSPEC_SCHEMA
172         logger.error(err)
173         raise Exception(err)
174
175 ##
176 # Creates a connection to Eucalytpus. This function is inspired by 
177 # the make_connection() in Euca2ools.
178 #
179 # @return A connection object or None
180 #
181 def getEucaConnection():
182     global cloud
183     accessKey = cloud['access_key']
184     secretKey = cloud['secret_key']
185     eucaURL   = cloud['cloud_url']
186     useSSL    = False
187     srvPath   = '/'
188     eucaPort  = 8773
189     logger    = logging.getLogger('EucaAggregate')
190
191     if not accessKey or not secretKey or not eucaURL:
192         logger.error('Please set ALL of the required environment ' \
193                      'variables by sourcing the eucarc file.')
194         return None
195     
196     # Split the url into parts
197     if eucaURL.find('https://') >= 0:
198         useSSL  = True
199         eucaURL = eucaURL.replace('https://', '')
200     elif eucaURL.find('http://') >= 0:
201         useSSL  = False
202         eucaURL = eucaURL.replace('http://', '')
203     (eucaHost, parts) = eucaURL.split(':')
204     if len(parts) > 1:
205         parts = parts.split('/')
206         eucaPort = int(parts[0])
207         parts = parts[1:]
208         srvPath = '/'.join(parts)
209
210     return boto.connect_ec2(aws_access_key_id=accessKey,
211                             aws_secret_access_key=secretKey,
212                             is_secure=useSSL,
213                             region=RegionInfo(None, 'eucalyptus', eucaHost), 
214                             port=eucaPort,
215                             path=srvPath)
216
217 ##
218 # Returns a string of keys that belong to the users of the given slice.
219 # @param sliceHRN The hunman readable name of the slice.
220 # @return sting()
221 #
222 def getKeysForSlice(api, sliceHRN):
223     logger   = logging.getLogger('EucaAggregate')
224     cred     = api.getCredential()
225     registry = api.registries[api.hrn]
226     keys     = []
227
228     # Get the slice record
229     records = registry.Resolve(sliceHRN, cred)
230     if not records:
231         logging.warn('Cannot find any record for slice %s' % sliceHRN)
232         return []
233
234     # Find who can log into this slice
235     persons = records[0]['persons']
236
237     # Extract the keys from persons records
238     for p in persons:
239         sliceUser = registry.Resolve(p, cred)
240         userKeys = sliceUser[0]['keys']
241         keys += userKeys
242
243     return ''.join(keys)
244
245 ##
246 # A class that builds the RSpec for Eucalyptus.
247 #
248 class EucaRSpecBuilder(object):
249     ##
250     # Initizes a RSpec builder
251     #
252     # @param cloud A dictionary containing data about a 
253     #              cloud (ex. clusters, ip)
254     def __init__(self, cloud):
255         self.eucaRSpec = XMLBuilder(format = True, tab_step = "  ")
256         self.cloudInfo = cloud
257
258     ##
259     # Creates a request stanza.
260     # 
261     # @param num The number of instances to create.
262     # @param image The disk image id.
263     # @param kernel The kernel image id.
264     # @param keypair Key pair to embed.
265     # @param ramdisk Ramdisk id (optional).
266     #
267     def __requestXML(self, num, image, kernel, keypair, ramdisk = ''):
268         xml = self.eucaRSpec
269         with xml.request:
270             with xml.instances:
271                 xml << str(num)
272             with xml.kernel_image(id=kernel):
273                 xml << ''
274             if ramdisk == '':
275                 with xml.ramdisk:
276                     xml << ''
277             else:
278                 with xml.ramdisk(id=ramdisk):
279                     xml << ''
280             with xml.disk_image(id=image):
281                 xml << ''
282             with xml.keypair:
283                 xml << keypair
284
285     ##
286     # Creates the cluster stanza.
287     #
288     # @param clusters Clusters information.
289     #
290     def __clustersXML(self, clusters):
291         cloud = self.cloudInfo
292         xml = self.eucaRSpec
293
294         for cluster in clusters:
295             instances = cluster['instances']
296             with xml.cluster(id=cluster['name']):
297                 with xml.ipv4:
298                     xml << cluster['ip']
299                 with xml.vm_types:
300                     for inst in instances:
301                         with xml.vm_type(name=inst[0]):
302                             with xml.free_slots:
303                                 xml << str(inst[1])
304                             with xml.max_instances:
305                                 xml << str(inst[2])
306                             with xml.cores:
307                                 xml << str(inst[3])
308                             with xml.memory(unit='MB'):
309                                 xml << str(inst[4])
310                             with xml.disk_space(unit='GB'):
311                                 xml << str(inst[5])
312                             if 'instances' in cloud and inst[0] in cloud['instances']:
313                                 existingEucaInstances = cloud['instances'][inst[0]]
314                                 with xml.euca_instances:
315                                     for eucaInst in existingEucaInstances:
316                                         with xml.euca_instance(id=eucaInst['id']):
317                                             with xml.state:
318                                                 xml << eucaInst['state']
319                                             with xml.public_dns:
320                                                 xml << eucaInst['public_dns']
321
322     def __imageBundleXML(self, bundles):
323         xml = self.eucaRSpec
324         with xml.bundles:
325             for bundle in bundles.keys():
326                 with xml.bundle(id=bundle):
327                     xml << ''
328
329     ##
330     # Creates the Images stanza.
331     #
332     # @param images A list of images in Eucalyptus.
333     #
334     def __imagesXML(self, images):
335         xml = self.eucaRSpec
336         with xml.images:
337             for image in images:
338                 with xml.image(id=image.id):
339                     with xml.type:
340                         xml << image.type
341                     with xml.arch:
342                         xml << image.architecture
343                     with xml.state:
344                         xml << image.state
345                     with xml.location:
346                         xml << image.location
347
348     ##
349     # Creates the KeyPairs stanza.
350     #
351     # @param keypairs A list of key pairs in Eucalyptus.
352     #
353     def __keyPairsXML(self, keypairs):
354         xml = self.eucaRSpec
355         with xml.keypairs:
356             for key in keypairs:
357                 with xml.keypair:
358                     xml << key.name
359
360     ##
361     # Generates the RSpec.
362     #
363     def toXML(self):
364         logger = logging.getLogger('EucaAggregate')
365         if not self.cloudInfo:
366             logger.error('No cloud information')
367             return ''
368
369         xml = self.eucaRSpec
370         cloud = self.cloudInfo
371         with xml.RSpec(type='eucalyptus'):
372             with xml.network(name=cloud['name']):
373                 with xml.ipv4:
374                     xml << cloud['ip']
375                 #self.__keyPairsXML(cloud['keypairs'])
376                 #self.__imagesXML(cloud['images'])
377                 self.__imageBundleXML(cloud['imageBundles'])
378                 self.__clustersXML(cloud['clusters'])
379         return str(xml)
380
381 ##
382 # A parser to parse the output of availability-zones.
383 #
384 # Note: Only one cluster is supported. If more than one, this will
385 #       not work.
386 #
387 class ZoneResultParser(object):
388     def __init__(self, zones):
389         self.zones = zones
390
391     def parse(self):
392         if len(self.zones) < 3:
393             return
394         clusterList = []
395         cluster = {} 
396         instList = []
397
398         cluster['name'] = self.zones[0].name
399         cluster['ip']   = self.zones[0].state
400
401         for i in range(2, len(self.zones)):
402             currZone = self.zones[i]
403             instType = currZone.name.split()[1]
404
405             stateString = currZone.state.split('/')
406             rscString   = stateString[1].split()
407
408             instFree      = int(stateString[0])
409             instMax       = int(rscString[0])
410             instNumCpu    = int(rscString[1])
411             instRam       = int(rscString[2])
412             instDiskSpace = int(rscString[3])
413
414             instTuple = (instType, instFree, instMax, instNumCpu, instRam, instDiskSpace)
415             instList.append(instTuple)
416         cluster['instances'] = instList
417         clusterList.append(cluster)
418
419         return clusterList
420
421 def ListResources(api, creds, options, call_id): 
422     if Callids().already_handled(call_id): return ""
423     global cloud
424     # get slice's hrn from options
425     xrn = options.get('geni_slice_urn', '')
426     hrn, type = urn_to_hrn(xrn)
427     logger = logging.getLogger('EucaAggregate')
428
429     # get hrn of the original caller
430     origin_hrn = options.get('origin_hrn', None)
431     if not origin_hrn:
432         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
433
434     conn = getEucaConnection()
435
436     if not conn:
437         logger.error('Cannot create a connection to Eucalyptus')
438         return 'Cannot create a connection to Eucalyptus'
439
440     try:
441         # Zones
442         zones = conn.get_all_zones(['verbose'])
443         p = ZoneResultParser(zones)
444         clusters = p.parse()
445         cloud['clusters'] = clusters
446         
447         # Images
448         images = conn.get_all_images()
449         cloud['images'] = images
450         cloud['imageBundles'] = {}
451         for i in images:
452             if i.type != 'machine' or i.kernel_id is None: continue
453             name = os.path.dirname(i.location)
454             detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
455             cloud['imageBundles'][name] = detail
456
457         # Key Pairs
458         keyPairs = conn.get_all_key_pairs()
459         cloud['keypairs'] = keyPairs
460
461         if hrn:
462             instanceId = []
463             instances  = []
464
465             # Get the instances that belong to the given slice from sqlite3
466             # XXX use getOne() in production because the slice's hrn is supposed
467             # to be unique. For testing, uniqueness is turned off in the db.
468             # If the slice isn't found in the database, create a record for the 
469             # slice.
470             matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
471             if matchedSlices:
472                 theSlice = matchedSlices[-1]
473             else:
474                 theSlice = Slice(slice_hrn = hrn)
475             for instance in theSlice.instances:
476                 instanceId.append(instance.instance_id)
477
478             # Get the information about those instances using their ids.
479             if len(instanceId) > 0:
480                 reservations = conn.get_all_instances(instanceId)
481             else:
482                 reservations = []
483             for reservation in reservations:
484                 for instance in reservation.instances:
485                     instances.append(instance)
486
487             # Construct a dictionary for the EucaRSpecBuilder
488             instancesDict = {}
489             for instance in instances:
490                 instList = instancesDict.setdefault(instance.instance_type, [])
491                 instInfoDict = {} 
492
493                 instInfoDict['id'] = instance.id
494                 instInfoDict['public_dns'] = instance.public_dns_name
495                 instInfoDict['state'] = instance.state
496                 instInfoDict['key'] = instance.key_name
497
498                 instList.append(instInfoDict)
499             cloud['instances'] = instancesDict
500
501     except EC2ResponseError, ec2RespErr:
502         errTree = ET.fromstring(ec2RespErr.body)
503         errMsgE = errTree.find('.//Message')
504         logger.error(errMsgE.text)
505
506     rspec = EucaRSpecBuilder(cloud).toXML()
507
508     # Remove the instances records so next time they won't 
509     # show up.
510     if 'instances' in cloud:
511         del cloud['instances']
512
513     return rspec
514
515 """
516 Hook called via 'sfi.py create'
517 """
518 def CreateSliver(api, xrn, creds, xml, users, call_id):
519     if Callids().already_handled(call_id): return ""
520
521     global cloud
522     hrn = urn_to_hrn(xrn)[0]
523     logger = logging.getLogger('EucaAggregate')
524
525     conn = getEucaConnection()
526     if not conn:
527         logger.error('Cannot create a connection to Eucalyptus')
528         return ""
529
530     # Validate RSpec
531     schemaXML = ET.parse(EUCALYPTUS_RSPEC_SCHEMA)
532     rspecValidator = ET.RelaxNG(schemaXML)
533     rspecXML = ET.XML(xml)
534     for network in rspecXML.iterfind("./network"):
535         if network.get('id') != cloud['name']:
536             # Throw away everything except my own RSpec
537             # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
538             network.getparent().remove(network)
539     if not rspecValidator(rspecXML):
540         error = rspecValidator.error_log.last_error
541         message = '%s (line %s)' % (error.message, error.line) 
542         # XXX: InvalidRSpec is new. Currently, I am not working with Trunk code.
543         #raise InvalidRSpec(message)
544         raise Exception(message)
545
546     # Get the slice from db or create one.
547     s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
548     if s is None:
549         s = Slice(slice_hrn = hrn)
550
551     # Process any changes in existing instance allocation
552     pendingRmInst = []
553     for sliceInst in s.instances:
554         pendingRmInst.append(sliceInst.instance_id)
555     existingInstGroup = rspecXML.findall(".//euca_instances")
556     for instGroup in existingInstGroup:
557         for existingInst in instGroup:
558             if existingInst.get('id') in pendingRmInst:
559                 pendingRmInst.remove(existingInst.get('id'))
560     for inst in pendingRmInst:
561         logger.debug('Instance %s will be terminated' % inst)
562         dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
563         # Only change the state but do not remove the entry from the DB.
564         dbInst.meta.state = 'deleted'
565         #dbInst.destroySelf()
566     conn.terminate_instances(pendingRmInst)
567
568     # Process new instance requests
569     requests = rspecXML.findall(".//request")
570     if requests:
571         # Get all the public keys associate with slice.
572         pubKeys = getKeysForSlice(api, s.slice_hrn)
573         logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
574     for req in requests:
575         vmTypeElement = req.getparent()
576         instType = vmTypeElement.get('name')
577         numInst  = int(req.find('instances').text)
578         
579         bundleName = req.find('bundle').text
580         if not cloud['imageBundles'][bundleName]:
581             logger.error('Cannot find bundle %s' % bundleName)
582         bundleInfo = cloud['imageBundles'][bundleName]
583         instKernel  = bundleInfo['kernelID']
584         instDiskImg = bundleInfo['imageID']
585         instRamDisk = bundleInfo['ramdiskID']
586         instKey     = None
587
588         # Create the instances
589         for i in range(0, numInst):
590             eucaInst = EucaInstance(slice      = s,
591                                     kernel_id  = instKernel,
592                                     image_id   = instDiskImg,
593                                     ramdisk_id = instRamDisk,
594                                     key_pair   = instKey,
595                                     inst_type  = instType,
596                                     meta       = Meta(start_time=datetime.datetime.now()))
597             eucaInst.reserveInstance(conn, pubKeys)
598
599     # xxx - should return altered rspec 
600     # with enough data for the client to understand what's happened
601     return xml
602
603 ##
604 # A separate process that will update the meta data.
605 #
606 def updateMeta():
607     logger = logging.getLogger('EucaMeta')
608     fileHandler = logging.FileHandler('/var/log/euca_meta.log')
609     fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
610     logger.addHandler(fileHandler)
611     fileHandler.setLevel(logging.DEBUG)
612     logger.setLevel(logging.DEBUG)
613
614     while True:
615         sleep(30)
616
617         # Get IDs of the instances that don't have IPs yet.
618         dbResults = Meta.select(
619                       AND(Meta.q.pri_addr == None,
620                           Meta.q.state    != 'deleted')
621                     )
622         dbResults = list(dbResults)
623         logger.debug('[update process] dbResults: %s' % dbResults)
624         instids = []
625         for r in dbResults:
626             instids.append(r.instance.instance_id)
627         logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
628
629         # Get instance information from Eucalyptus
630         conn = getEucaConnection()
631         vmInstances = []
632         reservations = conn.get_all_instances(instids)
633         for reservation in reservations:
634             vmInstances += reservation.instances
635
636         # Check the IPs
637         instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
638                     for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
639         logger.debug('[update process] IP dict: %s' % str(instIPs))
640
641         # Update the local DB
642         for ipData in instIPs:
643             dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
644             if not dbInst:
645                 logger.info('[update process] Could not find %s in DB' % ipData['id'])
646                 continue
647             dbInst.meta.pri_addr = ipData['pri_addr']
648             dbInst.meta.pub_addr = ipData['pub_addr']
649             dbInst.meta.state    = 'running'
650
651 def GetVersion(api):
652     xrn=Xrn(api.hrn)
653     request_rspec_versions = [dict(sfa_rspec_version)]
654     ad_rspec_versions = [dict(sfa_rspec_version)]
655     version_more = {'interface':'aggregate',
656                     'testbed':'myplc',
657                     'hrn':xrn.get_hrn(),
658                     'request_rspec_versions': request_rspec_versions,
659                     'ad_rspec_versions': ad_rspec_versions,
660                     'default_ad_rspec': dict(sfa_rspec_version)
661                     }
662     return version_core(version_more)
663
664 def main():
665     init_server()
666
667     #theRSpec = None
668     #with open(sys.argv[1]) as xml:
669     #    theRSpec = xml.read()
670     #CreateSliver(None, 'planetcloud.pc.test', theRSpec, 'call-id-cloudtest')
671
672     #rspec = ListResources('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy', 'test_euca')
673     #print rspec
674
675     server_key_file = '/var/lib/sfa/authorities/server.key'
676     server_cert_file = '/var/lib/sfa/authorities/server.cert'
677     api = SfaAPI(key_file = server_key_file, cert_file = server_cert_file, interface='aggregate')
678     print getKeysForSlice(api, 'gc.gc.test1')
679
680 if __name__ == "__main__":
681     main()
682