1 from __future__ import with_statement
7 from multiprocessing import Process
11 from boto.ec2.regioninfo import RegionInfo
12 from boto.exception import EC2ResponseError
13 from ConfigParser import ConfigParser
14 from xmlbuilder import XMLBuilder
15 from lxml import etree as ET
16 from sqlobject import *
18 from sfa.util.faults import InvalidRSpec
19 from sfa.util.xrn import urn_to_hrn, Xrn
20 from sfa.util.callids import Callids
21 #comes with its own logging
22 #from sfa.util.sfalogging import logger
23 from sfa.util.version import version_core
25 from sfa.trust.credential import Credential
27 from sfa.rspecs.version_manager import VersionManager
28 from sfa.rspecs.rspec import RSpec
30 from sfa.planetlab.plaggregate import PlAggregate
31 from sfa.planetlab.plslices import PlSlices
32 from sfa.planetlab.plxrn import slicename_to_hrn
35 # Meta data of an instance.
37 class Meta(SQLObject):
38 instance = SingleJoin('EucaInstance')
39 state = StringCol(default = 'new')
40 pub_addr = StringCol(default = None)
41 pri_addr = StringCol(default = None)
42 start_time = DateTimeCol(default = None)
45 # A representation of an Eucalyptus instance. This is a support class
46 # for instance <-> slice mapping.
48 class EucaInstance(SQLObject):
49 instance_id = StringCol(unique=True, default=None)
50 kernel_id = StringCol()
51 image_id = StringCol()
52 ramdisk_id = StringCol()
53 inst_type = StringCol()
54 key_pair = StringCol()
55 slice = ForeignKey('Slice')
56 meta = ForeignKey('Meta')
59 # Contacts Eucalyptus and tries to reserve this instance.
61 # @param botoConn A connection to Eucalyptus.
62 # @param pubKeys A list of public keys for the instance.
64 def reserveInstance(self, botoConn, pubKeys):
65 logger = logging.getLogger('EucaAggregate')
66 logger.info('Reserving an instance: image: %s, kernel: ' \
67 '%s, ramdisk: %s, type: %s, key: %s' % \
68 (self.image_id, self.kernel_id, self.ramdisk_id,
69 self.inst_type, self.key_pair))
72 reservation = botoConn.run_instances(self.image_id,
73 kernel_id = self.kernel_id,
74 ramdisk_id = self.ramdisk_id,
75 instance_type = self.inst_type,
76 key_name = self.key_pair,
78 for instance in reservation.instances:
79 self.instance_id = instance.id
81 # If there is an error, destroy itself.
82 except EC2ResponseError, ec2RespErr:
83 errTree = ET.fromstring(ec2RespErr.body)
84 msg = errTree.find('.//Message')
85 logger.error(msg.text)
89 # A representation of a PlanetLab slice. This is a support class
90 # for instance <-> slice mapping.
92 class Slice(SQLObject):
93 slice_hrn = StringCol()
94 #slice_index = DatabaseIndex('slice_hrn')
95 instances = MultipleJoin('EucaInstance')
98 # A class that builds the RSpec for Eucalyptus.
100 class EucaRSpecBuilder(object):
102 # Initizes a RSpec builder
104 # @param cloud A dictionary containing data about a
105 # cloud (ex. clusters, ip)
106 def __init__(self, cloud):
107 self.eucaRSpec = XMLBuilder(format = True, tab_step = " ")
108 self.cloudInfo = cloud
111 # Creates a request stanza.
113 # @param num The number of instances to create.
114 # @param image The disk image id.
115 # @param kernel The kernel image id.
116 # @param keypair Key pair to embed.
117 # @param ramdisk Ramdisk id (optional).
119 def __requestXML(self, num, image, kernel, keypair, ramdisk = ''):
124 with xml.kernel_image(id=kernel):
130 with xml.ramdisk(id=ramdisk):
132 with xml.disk_image(id=image):
138 # Creates the cluster stanza.
140 # @param clusters Clusters information.
142 def __clustersXML(self, clusters):
143 cloud = self.cloudInfo
146 for cluster in clusters:
147 instances = cluster['instances']
148 with xml.cluster(id=cluster['name']):
152 for inst in instances:
153 with xml.vm_type(name=inst[0]):
156 with xml.max_instances:
160 with xml.memory(unit='MB'):
162 with xml.disk_space(unit='GB'):
164 if 'instances' in cloud and inst[0] in cloud['instances']:
165 existingEucaInstances = cloud['instances'][inst[0]]
166 with xml.euca_instances:
167 for eucaInst in existingEucaInstances:
168 with xml.euca_instance(id=eucaInst['id']):
170 xml << eucaInst['state']
172 xml << eucaInst['public_dns']
174 def __imageBundleXML(self, bundles):
177 for bundle in bundles.keys():
178 with xml.bundle(id=bundle):
182 # Creates the Images stanza.
184 # @param images A list of images in Eucalyptus.
186 def __imagesXML(self, images):
190 with xml.image(id=image.id):
194 xml << image.architecture
198 xml << image.location
201 # Creates the KeyPairs stanza.
203 # @param keypairs A list of key pairs in Eucalyptus.
205 def __keyPairsXML(self, keypairs):
213 # Generates the RSpec.
216 logger = logging.getLogger('EucaAggregate')
217 if not self.cloudInfo:
218 logger.error('No cloud information')
222 cloud = self.cloudInfo
223 with xml.RSpec(type='eucalyptus'):
224 with xml.network(name=cloud['name']):
227 #self.__keyPairsXML(cloud['keypairs'])
228 #self.__imagesXML(cloud['images'])
229 self.__imageBundleXML(cloud['imageBundles'])
230 self.__clustersXML(cloud['clusters'])
234 # A parser to parse the output of availability-zones.
236 # Note: Only one cluster is supported. If more than one, this will
239 class ZoneResultParser(object):
240 def __init__(self, zones):
244 if len(self.zones) < 3:
250 cluster['name'] = self.zones[0].name
251 cluster['ip'] = self.zones[0].state
253 for i in range(2, len(self.zones)):
254 currZone = self.zones[i]
255 instType = currZone.name.split()[1]
257 stateString = currZone.state.split('/')
258 rscString = stateString[1].split()
260 instFree = int(stateString[0])
261 instMax = int(rscString[0])
262 instNumCpu = int(rscString[1])
263 instRam = int(rscString[2])
264 instDiskSpace = int(rscString[3])
266 instTuple = (instType, instFree, instMax, instNumCpu, instRam, instDiskSpace)
267 instList.append(instTuple)
268 cluster['instances'] = instList
269 clusterList.append(cluster)
273 class AggregateManagerEucalyptus:
275 # The data structure used to represent a cloud.
276 # It contains the cloud name, its ip address, image information,
277 # key pairs, and clusters information.
280 # The location of the RelaxNG schema.
281 EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
285 # the init_server mechanism has vanished
286 def __init__ (self, config):
287 if AggregateManagerEucalyptus._inited: return
288 AggregateManagerEucalyptus.init_server()
290 # Initialize the aggregate manager by reading a configuration file.
293 logger = logging.getLogger('EucaAggregate')
294 fileHandler = logging.FileHandler('/var/log/euca.log')
295 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
296 logger.addHandler(fileHandler)
297 fileHandler.setLevel(logging.DEBUG)
298 logger.setLevel(logging.DEBUG)
300 configParser = ConfigParser()
301 configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
302 if len(configParser.sections()) < 1:
303 logger.error('No cloud defined in the config file')
304 raise Exception('Cannot find cloud definition in configuration file.')
306 # Only read the first section.
307 cloudSec = configParser.sections()[0]
308 AggregateManagerEucalyptus.cloud['name'] = cloudSec
309 AggregateManagerEucalyptus.cloud['access_key'] = configParser.get(cloudSec, 'access_key')
310 AggregateManagerEucalyptus.cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
311 AggregateManagerEucalyptus.cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url')
312 cloudURL = AggregateManagerEucalyptus.cloud['cloud_url']
313 if cloudURL.find('https://') >= 0:
314 cloudURL = cloudURL.replace('https://', '')
315 elif cloudURL.find('http://') >= 0:
316 cloudURL = cloudURL.replace('http://', '')
317 (AggregateManagerEucalyptus.cloud['ip'], parts) = cloudURL.split(':')
319 # Create image bundles
320 images = self.getEucaConnection().get_all_images()
321 AggregateManagerEucalyptus.cloud['images'] = images
322 AggregateManagerEucalyptus.cloud['imageBundles'] = {}
324 if i.type != 'machine' or i.kernel_id is None: continue
325 name = os.path.dirname(i.location)
326 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
327 AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
329 # Initialize sqlite3 database and tables.
330 dbPath = '/etc/sfa/db'
331 dbName = 'euca_aggregate.db'
333 if not os.path.isdir(dbPath):
334 logger.info('%s not found. Creating directory ...' % dbPath)
337 conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
338 sqlhub.processConnection = conn
339 Slice.createTable(ifNotExists=True)
340 EucaInstance.createTable(ifNotExists=True)
341 Meta.createTable(ifNotExists=True)
343 # Start the update process to keep track of the meta data
344 # about Eucalyptus instance.
345 Process(target=AggregateManagerEucalyptus.updateMeta).start()
347 # Make sure the schema exists.
348 if not os.path.exists(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA):
349 err = 'Cannot location schema at %s' % AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA
354 # A separate process that will update the meta data.
358 logger = logging.getLogger('EucaMeta')
359 fileHandler = logging.FileHandler('/var/log/euca_meta.log')
360 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
361 logger.addHandler(fileHandler)
362 fileHandler.setLevel(logging.DEBUG)
363 logger.setLevel(logging.DEBUG)
368 # Get IDs of the instances that don't have IPs yet.
369 dbResults = Meta.select(
370 AND(Meta.q.pri_addr == None,
371 Meta.q.state != 'deleted')
373 dbResults = list(dbResults)
374 logger.debug('[update process] dbResults: %s' % dbResults)
379 instids.append(r.instance.instance_id)
380 logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
382 # Get instance information from Eucalyptus
383 conn = self.getEucaConnection()
385 reservations = conn.get_all_instances(instids)
386 for reservation in reservations:
387 vmInstances += reservation.instances
390 instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
391 for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
392 logger.debug('[update process] IP dict: %s' % str(instIPs))
394 # Update the local DB
395 for ipData in instIPs:
396 dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
398 logger.info('[update process] Could not find %s in DB' % ipData['id'])
400 dbInst.meta.pri_addr = ipData['pri_addr']
401 dbInst.meta.pub_addr = ipData['pub_addr']
402 dbInst.meta.state = 'running'
404 self.dumpinstanceInfo()
407 # Creates a connection to Eucalytpus. This function is inspired by
408 # the make_connection() in Euca2ools.
410 # @return A connection object or None
412 def getEucaConnection():
413 accessKey = AggregateManagerEucalyptus.cloud['access_key']
414 secretKey = AggregateManagerEucalyptus.cloud['secret_key']
415 eucaURL = AggregateManagerEucalyptus.cloud['cloud_url']
419 logger = logging.getLogger('EucaAggregate')
421 if not accessKey or not secretKey or not eucaURL:
422 logger.error('Please set ALL of the required environment ' \
423 'variables by sourcing the eucarc file.')
426 # Split the url into parts
427 if eucaURL.find('https://') >= 0:
429 eucaURL = eucaURL.replace('https://', '')
430 elif eucaURL.find('http://') >= 0:
432 eucaURL = eucaURL.replace('http://', '')
433 (eucaHost, parts) = eucaURL.split(':')
435 parts = parts.split('/')
436 eucaPort = int(parts[0])
438 srvPath = '/'.join(parts)
440 return boto.connect_ec2(aws_access_key_id=accessKey,
441 aws_secret_access_key=secretKey,
443 region=RegionInfo(None, 'eucalyptus', eucaHost),
447 def ListResources(api, creds, options):
448 call_id = options.get('call_id')
449 if Callids().already_handled(call_id): return ""
450 # get slice's hrn from options
451 xrn = options.get('geni_slice_urn', '')
452 hrn, type = urn_to_hrn(xrn)
453 logger = logging.getLogger('EucaAggregate')
455 # get hrn of the original caller
456 origin_hrn = options.get('origin_hrn', None)
458 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
460 conn = self.getEucaConnection()
463 logger.error('Cannot create a connection to Eucalyptus')
464 return 'Cannot create a connection to Eucalyptus'
468 zones = conn.get_all_zones(['verbose'])
469 p = ZoneResultParser(zones)
471 AggregateManagerEucalyptus.cloud['clusters'] = clusters
474 images = conn.get_all_images()
475 AggregateManagerEucalyptus.cloud['images'] = images
476 AggregateManagerEucalyptus.cloud['imageBundles'] = {}
478 if i.type != 'machine' or i.kernel_id is None: continue
479 name = os.path.dirname(i.location)
480 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
481 AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
484 keyPairs = conn.get_all_key_pairs()
485 AggregateManagerEucalyptus.cloud['keypairs'] = keyPairs
491 # Get the instances that belong to the given slice from sqlite3
492 # XXX use getOne() in production because the slice's hrn is supposed
493 # to be unique. For testing, uniqueness is turned off in the db.
494 # If the slice isn't found in the database, create a record for the
496 matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
498 theSlice = matchedSlices[-1]
500 theSlice = Slice(slice_hrn = hrn)
501 for instance in theSlice.instances:
502 instanceId.append(instance.instance_id)
504 # Get the information about those instances using their ids.
505 if len(instanceId) > 0:
506 reservations = conn.get_all_instances(instanceId)
509 for reservation in reservations:
510 for instance in reservation.instances:
511 instances.append(instance)
513 # Construct a dictionary for the EucaRSpecBuilder
515 for instance in instances:
516 instList = instancesDict.setdefault(instance.instance_type, [])
519 instInfoDict['id'] = instance.id
520 instInfoDict['public_dns'] = instance.public_dns_name
521 instInfoDict['state'] = instance.state
522 instInfoDict['key'] = instance.key_name
524 instList.append(instInfoDict)
525 AggregateManagerEucalyptus.cloud['instances'] = instancesDict
527 except EC2ResponseError, ec2RespErr:
528 errTree = ET.fromstring(ec2RespErr.body)
529 errMsgE = errTree.find('.//Message')
530 logger.error(errMsgE.text)
532 rspec = EucaRSpecBuilder(AggregateManagerEucalyptus.cloud).toXML()
534 # Remove the instances records so next time they won't
536 if 'instances' in AggregateManagerEucalyptus.cloud:
537 del AggregateManagerEucalyptus.cloud['instances']
542 Hook called via 'sfi.py create'
544 def CreateSliver(api, slice_xrn, creds, xml, users, options):
545 call_id = options.get('call_id')
546 if Callids().already_handled(call_id): return ""
548 logger = logging.getLogger('EucaAggregate')
549 logger.debug("In CreateSliver")
551 aggregate = PlAggregate(self.driver)
552 slices = PlSlices(self.driver)
553 (hrn, type) = urn_to_hrn(slice_xrn)
554 peer = slices.get_peer(hrn)
555 sfa_peer = slices.get_sfa_peer(hrn)
558 slice_record = users[0].get('slice_record', {})
560 conn = self.getEucaConnection()
562 logger.error('Cannot create a connection to Eucalyptus')
566 schemaXML = ET.parse(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA)
567 rspecValidator = ET.RelaxNG(schemaXML)
568 rspecXML = ET.XML(xml)
569 for network in rspecXML.iterfind("./network"):
570 if network.get('name') != AggregateManagerEucalyptus.cloud['name']:
571 # Throw away everything except my own RSpec
572 # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
573 network.getparent().remove(network)
574 if not rspecValidator(rspecXML):
575 error = rspecValidator.error_log.last_error
576 message = '%s (line %s)' % (error.message, error.line)
577 raise InvalidRSpec(message)
580 Create the sliver[s] (slice) at this aggregate.
581 Verify HRN and initialize the slice record in PLC if necessary.
584 # ensure site record exists
585 site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
586 # ensure slice record exists
587 slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
588 # ensure person records exists
589 persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
591 # Get the slice from db or create one.
592 s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
594 s = Slice(slice_hrn = hrn)
596 # Process any changes in existing instance allocation
598 for sliceInst in s.instances:
599 pendingRmInst.append(sliceInst.instance_id)
600 existingInstGroup = rspecXML.findall(".//euca_instances")
601 for instGroup in existingInstGroup:
602 for existingInst in instGroup:
603 if existingInst.get('id') in pendingRmInst:
604 pendingRmInst.remove(existingInst.get('id'))
605 for inst in pendingRmInst:
606 dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
607 if dbInst.meta.state != 'deleted':
608 logger.debug('Instance %s will be terminated' % inst)
609 # Terminate instances one at a time for robustness
610 conn.terminate_instances([inst])
611 # Only change the state but do not remove the entry from the DB.
612 dbInst.meta.state = 'deleted'
613 #dbInst.destroySelf()
615 # Process new instance requests
616 requests = rspecXML.findall(".//request")
618 # Get all the public keys associate with slice.
622 logger.debug("Keys: %s" % user['keys'])
623 pubKeys = '\n'.join(keys)
624 logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
626 vmTypeElement = req.getparent()
627 instType = vmTypeElement.get('name')
628 numInst = int(req.find('instances').text)
630 bundleName = req.find('bundle').text
631 if not AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]:
632 logger.error('Cannot find bundle %s' % bundleName)
633 bundleInfo = AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]
634 instKernel = bundleInfo['kernelID']
635 instDiskImg = bundleInfo['imageID']
636 instRamDisk = bundleInfo['ramdiskID']
639 # Create the instances
640 for i in range(0, numInst):
641 eucaInst = EucaInstance(slice = s,
642 kernel_id = instKernel,
643 image_id = instDiskImg,
644 ramdisk_id = instRamDisk,
646 inst_type = instType,
647 meta = Meta(start_time=datetime.datetime.now()))
648 eucaInst.reserveInstance(conn, pubKeys)
650 # xxx - should return altered rspec
651 # with enough data for the client to understand what's happened
655 # Return information on the IP addresses bound to each slice's instances
657 def dumpInstanceInfo():
658 logger = logging.getLogger('EucaMeta')
659 outdir = "/var/www/html/euca/"
660 outfile = outdir + "instances.txt"
665 if e.errno != errno.EEXIST:
668 dbResults = Meta.select(
669 AND(Meta.q.pri_addr != None,
670 Meta.q.state == 'running')
672 dbResults = list(dbResults)
673 f = open(outfile, "w")
675 instId = r.instance.instance_id
677 hrn = r.instance.slice.slice_hrn
678 logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
679 f.write("%s %s %s\n" % (instId, ipaddr, hrn))
682 def GetVersion(api, options):
684 version_manager = VersionManager()
685 ad_rspec_versions = []
686 request_rspec_versions = []
687 for rspec_version in version_manager.versions:
688 if rspec_version.content_type in ['*', 'ad']:
689 ad_rspec_versions.append(rspec_version.to_dict())
690 if rspec_version.content_type in ['*', 'request']:
691 request_rspec_versions.append(rspec_version.to_dict())
693 version_more = {'interface':'aggregate',
698 'geni_request_rspec_versions': request_rspec_versions,
699 'geni_ad_rspec_versions': ad_rspec_versions,
701 return version_core(version_more)