1 from __future__ import with_statement
7 from multiprocessing import Process
11 from boto.ec2.regioninfo import RegionInfo
12 from boto.exception import EC2ResponseError
13 from ConfigParser import ConfigParser
14 from xmlbuilder import XMLBuilder
15 from lxml import etree as ET
16 from sqlobject import *
18 from sfa.util.faults import InvalidRSpec
19 from sfa.util.xrn import urn_to_hrn, Xrn
20 from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn
21 from sfa.util.callids import Callids
22 #comes with its own logging
23 #from sfa.util.sfalogging import logger
24 from sfa.util.version import version_core
25 from sfa.trust.credential import Credential
26 from sfa.server.sfaapi import SfaApi
27 from sfa.plc.aggregate import Aggregate
28 from sfa.plc.slices import Slice, Slices
29 from sfa.rspecs.version_manager import VersionManager
30 from sfa.rspecs.rspec import RSpec
31 # not sure what this used to be nor where it is now defined
32 #from sfa.rspecs.sfa_rspec import sfa_rspec_version
35 # Meta data of an instance.
37 class Meta(SQLObject):
38 instance = SingleJoin('EucaInstance')
39 state = StringCol(default = 'new')
40 pub_addr = StringCol(default = None)
41 pri_addr = StringCol(default = None)
42 start_time = DateTimeCol(default = None)
45 # A representation of an Eucalyptus instance. This is a support class
46 # for instance <-> slice mapping.
48 class EucaInstance(SQLObject):
49 instance_id = StringCol(unique=True, default=None)
50 kernel_id = StringCol()
51 image_id = StringCol()
52 ramdisk_id = StringCol()
53 inst_type = StringCol()
54 key_pair = StringCol()
55 slice = ForeignKey('Slice')
56 meta = ForeignKey('Meta')
59 # Contacts Eucalyptus and tries to reserve this instance.
61 # @param botoConn A connection to Eucalyptus.
62 # @param pubKeys A list of public keys for the instance.
64 def reserveInstance(self, botoConn, pubKeys):
65 logger = logging.getLogger('EucaAggregate')
66 logger.info('Reserving an instance: image: %s, kernel: ' \
67 '%s, ramdisk: %s, type: %s, key: %s' % \
68 (self.image_id, self.kernel_id, self.ramdisk_id,
69 self.inst_type, self.key_pair))
72 reservation = botoConn.run_instances(self.image_id,
73 kernel_id = self.kernel_id,
74 ramdisk_id = self.ramdisk_id,
75 instance_type = self.inst_type,
76 key_name = self.key_pair,
78 for instance in reservation.instances:
79 self.instance_id = instance.id
81 # If there is an error, destroy itself.
82 except EC2ResponseError, ec2RespErr:
83 errTree = ET.fromstring(ec2RespErr.body)
84 msg = errTree.find('.//Message')
85 logger.error(msg.text)
89 # A representation of a PlanetLab slice. This is a support class
90 # for instance <-> slice mapping.
92 class Slice(SQLObject):
93 slice_hrn = StringCol()
94 #slice_index = DatabaseIndex('slice_hrn')
95 instances = MultipleJoin('EucaInstance')
98 # A class that builds the RSpec for Eucalyptus.
100 class EucaRSpecBuilder(object):
102 # Initizes a RSpec builder
104 # @param cloud A dictionary containing data about a
105 # cloud (ex. clusters, ip)
106 def __init__(self, cloud):
107 self.eucaRSpec = XMLBuilder(format = True, tab_step = " ")
108 self.cloudInfo = cloud
111 # Creates a request stanza.
113 # @param num The number of instances to create.
114 # @param image The disk image id.
115 # @param kernel The kernel image id.
116 # @param keypair Key pair to embed.
117 # @param ramdisk Ramdisk id (optional).
119 def __requestXML(self, num, image, kernel, keypair, ramdisk = ''):
124 with xml.kernel_image(id=kernel):
130 with xml.ramdisk(id=ramdisk):
132 with xml.disk_image(id=image):
138 # Creates the cluster stanza.
140 # @param clusters Clusters information.
142 def __clustersXML(self, clusters):
143 cloud = self.cloudInfo
146 for cluster in clusters:
147 instances = cluster['instances']
148 with xml.cluster(id=cluster['name']):
152 for inst in instances:
153 with xml.vm_type(name=inst[0]):
156 with xml.max_instances:
160 with xml.memory(unit='MB'):
162 with xml.disk_space(unit='GB'):
164 if 'instances' in cloud and inst[0] in cloud['instances']:
165 existingEucaInstances = cloud['instances'][inst[0]]
166 with xml.euca_instances:
167 for eucaInst in existingEucaInstances:
168 with xml.euca_instance(id=eucaInst['id']):
170 xml << eucaInst['state']
172 xml << eucaInst['public_dns']
174 def __imageBundleXML(self, bundles):
177 for bundle in bundles.keys():
178 with xml.bundle(id=bundle):
182 # Creates the Images stanza.
184 # @param images A list of images in Eucalyptus.
186 def __imagesXML(self, images):
190 with xml.image(id=image.id):
194 xml << image.architecture
198 xml << image.location
201 # Creates the KeyPairs stanza.
203 # @param keypairs A list of key pairs in Eucalyptus.
205 def __keyPairsXML(self, keypairs):
213 # Generates the RSpec.
216 logger = logging.getLogger('EucaAggregate')
217 if not self.cloudInfo:
218 logger.error('No cloud information')
222 cloud = self.cloudInfo
223 with xml.RSpec(type='eucalyptus'):
224 with xml.network(name=cloud['name']):
227 #self.__keyPairsXML(cloud['keypairs'])
228 #self.__imagesXML(cloud['images'])
229 self.__imageBundleXML(cloud['imageBundles'])
230 self.__clustersXML(cloud['clusters'])
234 # A parser to parse the output of availability-zones.
236 # Note: Only one cluster is supported. If more than one, this will
239 class ZoneResultParser(object):
240 def __init__(self, zones):
244 if len(self.zones) < 3:
250 cluster['name'] = self.zones[0].name
251 cluster['ip'] = self.zones[0].state
253 for i in range(2, len(self.zones)):
254 currZone = self.zones[i]
255 instType = currZone.name.split()[1]
257 stateString = currZone.state.split('/')
258 rscString = stateString[1].split()
260 instFree = int(stateString[0])
261 instMax = int(rscString[0])
262 instNumCpu = int(rscString[1])
263 instRam = int(rscString[2])
264 instDiskSpace = int(rscString[3])
266 instTuple = (instType, instFree, instMax, instNumCpu, instRam, instDiskSpace)
267 instList.append(instTuple)
268 cluster['instances'] = instList
269 clusterList.append(cluster)
273 class AggregateManagerEucalyptus:
275 # The data structure used to represent a cloud.
276 # It contains the cloud name, its ip address, image information,
277 # key pairs, and clusters information.
280 # The location of the RelaxNG schema.
281 EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
285 # the init_server mechanism has vanished
287 if AggregateManagerEucalyptus._inited: return
288 AggregateManagerEucalyptus.init_server()
290 # Initialize the aggregate manager by reading a configuration file.
293 logger = logging.getLogger('EucaAggregate')
294 fileHandler = logging.FileHandler('/var/log/euca.log')
295 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
296 logger.addHandler(fileHandler)
297 fileHandler.setLevel(logging.DEBUG)
298 logger.setLevel(logging.DEBUG)
300 configParser = ConfigParser()
301 configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
302 if len(configParser.sections()) < 1:
303 logger.error('No cloud defined in the config file')
304 raise Exception('Cannot find cloud definition in configuration file.')
306 # Only read the first section.
307 cloudSec = configParser.sections()[0]
308 AggregateManagerEucalyptus.cloud['name'] = cloudSec
309 AggregateManagerEucalyptus.cloud['access_key'] = configParser.get(cloudSec, 'access_key')
310 AggregateManagerEucalyptus.cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
311 AggregateManagerEucalyptus.cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url')
312 cloudURL = AggregateManagerEucalyptus.cloud['cloud_url']
313 if cloudURL.find('https://') >= 0:
314 cloudURL = cloudURL.replace('https://', '')
315 elif cloudURL.find('http://') >= 0:
316 cloudURL = cloudURL.replace('http://', '')
317 (AggregateManagerEucalyptus.cloud['ip'], parts) = cloudURL.split(':')
319 # Create image bundles
320 images = self.getEucaConnection().get_all_images()
321 AggregateManagerEucalyptus.cloud['images'] = images
322 AggregateManagerEucalyptus.cloud['imageBundles'] = {}
324 if i.type != 'machine' or i.kernel_id is None: continue
325 name = os.path.dirname(i.location)
326 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
327 AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
329 # Initialize sqlite3 database and tables.
330 dbPath = '/etc/sfa/db'
331 dbName = 'euca_aggregate.db'
333 if not os.path.isdir(dbPath):
334 logger.info('%s not found. Creating directory ...' % dbPath)
337 conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
338 sqlhub.processConnection = conn
339 Slice.createTable(ifNotExists=True)
340 EucaInstance.createTable(ifNotExists=True)
341 Meta.createTable(ifNotExists=True)
343 # Start the update process to keep track of the meta data
344 # about Eucalyptus instance.
345 Process(target=AggregateManagerEucalyptus.updateMeta).start()
347 # Make sure the schema exists.
348 if not os.path.exists(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA):
349 err = 'Cannot location schema at %s' % AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA
354 # A separate process that will update the meta data.
358 logger = logging.getLogger('EucaMeta')
359 fileHandler = logging.FileHandler('/var/log/euca_meta.log')
360 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
361 logger.addHandler(fileHandler)
362 fileHandler.setLevel(logging.DEBUG)
363 logger.setLevel(logging.DEBUG)
368 # Get IDs of the instances that don't have IPs yet.
369 dbResults = Meta.select(
370 AND(Meta.q.pri_addr == None,
371 Meta.q.state != 'deleted')
373 dbResults = list(dbResults)
374 logger.debug('[update process] dbResults: %s' % dbResults)
379 instids.append(r.instance.instance_id)
380 logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
382 # Get instance information from Eucalyptus
383 conn = self.getEucaConnection()
385 reservations = conn.get_all_instances(instids)
386 for reservation in reservations:
387 vmInstances += reservation.instances
390 instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
391 for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
392 logger.debug('[update process] IP dict: %s' % str(instIPs))
394 # Update the local DB
395 for ipData in instIPs:
396 dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
398 logger.info('[update process] Could not find %s in DB' % ipData['id'])
400 dbInst.meta.pri_addr = ipData['pri_addr']
401 dbInst.meta.pub_addr = ipData['pub_addr']
402 dbInst.meta.state = 'running'
404 self.dumpinstanceInfo()
407 # Creates a connection to Eucalytpus. This function is inspired by
408 # the make_connection() in Euca2ools.
410 # @return A connection object or None
412 def getEucaConnection():
413 accessKey = AggregateManagerEucalyptus.cloud['access_key']
414 secretKey = AggregateManagerEucalyptus.cloud['secret_key']
415 eucaURL = AggregateManagerEucalyptus.cloud['cloud_url']
419 logger = logging.getLogger('EucaAggregate')
421 if not accessKey or not secretKey or not eucaURL:
422 logger.error('Please set ALL of the required environment ' \
423 'variables by sourcing the eucarc file.')
426 # Split the url into parts
427 if eucaURL.find('https://') >= 0:
429 eucaURL = eucaURL.replace('https://', '')
430 elif eucaURL.find('http://') >= 0:
432 eucaURL = eucaURL.replace('http://', '')
433 (eucaHost, parts) = eucaURL.split(':')
435 parts = parts.split('/')
436 eucaPort = int(parts[0])
438 srvPath = '/'.join(parts)
440 return boto.connect_ec2(aws_access_key_id=accessKey,
441 aws_secret_access_key=secretKey,
443 region=RegionInfo(None, 'eucalyptus', eucaHost),
447 def ListResources(api, creds, options, call_id):
448 if Callids().already_handled(call_id): return ""
449 # get slice's hrn from options
450 xrn = options.get('geni_slice_urn', '')
451 hrn, type = urn_to_hrn(xrn)
452 logger = logging.getLogger('EucaAggregate')
454 # get hrn of the original caller
455 origin_hrn = options.get('origin_hrn', None)
457 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
459 conn = self.getEucaConnection()
462 logger.error('Cannot create a connection to Eucalyptus')
463 return 'Cannot create a connection to Eucalyptus'
467 zones = conn.get_all_zones(['verbose'])
468 p = ZoneResultParser(zones)
470 AggregateManagerEucalyptus.cloud['clusters'] = clusters
473 images = conn.get_all_images()
474 AggregateManagerEucalyptus.cloud['images'] = images
475 AggregateManagerEucalyptus.cloud['imageBundles'] = {}
477 if i.type != 'machine' or i.kernel_id is None: continue
478 name = os.path.dirname(i.location)
479 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
480 AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
483 keyPairs = conn.get_all_key_pairs()
484 AggregateManagerEucalyptus.cloud['keypairs'] = keyPairs
490 # Get the instances that belong to the given slice from sqlite3
491 # XXX use getOne() in production because the slice's hrn is supposed
492 # to be unique. For testing, uniqueness is turned off in the db.
493 # If the slice isn't found in the database, create a record for the
495 matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
497 theSlice = matchedSlices[-1]
499 theSlice = Slice(slice_hrn = hrn)
500 for instance in theSlice.instances:
501 instanceId.append(instance.instance_id)
503 # Get the information about those instances using their ids.
504 if len(instanceId) > 0:
505 reservations = conn.get_all_instances(instanceId)
508 for reservation in reservations:
509 for instance in reservation.instances:
510 instances.append(instance)
512 # Construct a dictionary for the EucaRSpecBuilder
514 for instance in instances:
515 instList = instancesDict.setdefault(instance.instance_type, [])
518 instInfoDict['id'] = instance.id
519 instInfoDict['public_dns'] = instance.public_dns_name
520 instInfoDict['state'] = instance.state
521 instInfoDict['key'] = instance.key_name
523 instList.append(instInfoDict)
524 AggregateManagerEucalyptus.cloud['instances'] = instancesDict
526 except EC2ResponseError, ec2RespErr:
527 errTree = ET.fromstring(ec2RespErr.body)
528 errMsgE = errTree.find('.//Message')
529 logger.error(errMsgE.text)
531 rspec = EucaRSpecBuilder(AggregateManagerEucalyptus.cloud).toXML()
533 # Remove the instances records so next time they won't
535 if 'instances' in AggregateManagerEucalyptus.cloud:
536 del AggregateManagerEucalyptus.cloud['instances']
541 Hook called via 'sfi.py create'
543 def CreateSliver(api, slice_xrn, creds, xml, users, call_id):
544 if Callids().already_handled(call_id): return ""
546 logger = logging.getLogger('EucaAggregate')
547 logger.debug("In CreateSliver")
549 aggregate = Aggregate(api)
551 (hrn, type) = urn_to_hrn(slice_xrn)
552 peer = slices.get_peer(hrn)
553 sfa_peer = slices.get_sfa_peer(hrn)
556 slice_record = users[0].get('slice_record', {})
558 conn = self.getEucaConnection()
560 logger.error('Cannot create a connection to Eucalyptus')
564 schemaXML = ET.parse(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA)
565 rspecValidator = ET.RelaxNG(schemaXML)
566 rspecXML = ET.XML(xml)
567 for network in rspecXML.iterfind("./network"):
568 if network.get('name') != AggregateManagerEucalyptus.cloud['name']:
569 # Throw away everything except my own RSpec
570 # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
571 network.getparent().remove(network)
572 if not rspecValidator(rspecXML):
573 error = rspecValidator.error_log.last_error
574 message = '%s (line %s)' % (error.message, error.line)
575 raise InvalidRSpec(message)
578 Create the sliver[s] (slice) at this aggregate.
579 Verify HRN and initialize the slice record in PLC if necessary.
582 # ensure site record exists
583 site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
584 # ensure slice record exists
585 slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
586 # ensure person records exists
587 persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
589 # Get the slice from db or create one.
590 s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
592 s = Slice(slice_hrn = hrn)
594 # Process any changes in existing instance allocation
596 for sliceInst in s.instances:
597 pendingRmInst.append(sliceInst.instance_id)
598 existingInstGroup = rspecXML.findall(".//euca_instances")
599 for instGroup in existingInstGroup:
600 for existingInst in instGroup:
601 if existingInst.get('id') in pendingRmInst:
602 pendingRmInst.remove(existingInst.get('id'))
603 for inst in pendingRmInst:
604 dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
605 if dbInst.meta.state != 'deleted':
606 logger.debug('Instance %s will be terminated' % inst)
607 # Terminate instances one at a time for robustness
608 conn.terminate_instances([inst])
609 # Only change the state but do not remove the entry from the DB.
610 dbInst.meta.state = 'deleted'
611 #dbInst.destroySelf()
613 # Process new instance requests
614 requests = rspecXML.findall(".//request")
616 # Get all the public keys associate with slice.
620 logger.debug("Keys: %s" % user['keys'])
621 pubKeys = '\n'.join(keys)
622 logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
624 vmTypeElement = req.getparent()
625 instType = vmTypeElement.get('name')
626 numInst = int(req.find('instances').text)
628 bundleName = req.find('bundle').text
629 if not AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]:
630 logger.error('Cannot find bundle %s' % bundleName)
631 bundleInfo = AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]
632 instKernel = bundleInfo['kernelID']
633 instDiskImg = bundleInfo['imageID']
634 instRamDisk = bundleInfo['ramdiskID']
637 # Create the instances
638 for i in range(0, numInst):
639 eucaInst = EucaInstance(slice = s,
640 kernel_id = instKernel,
641 image_id = instDiskImg,
642 ramdisk_id = instRamDisk,
644 inst_type = instType,
645 meta = Meta(start_time=datetime.datetime.now()))
646 eucaInst.reserveInstance(conn, pubKeys)
648 # xxx - should return altered rspec
649 # with enough data for the client to understand what's happened
653 # Return information on the IP addresses bound to each slice's instances
655 def dumpInstanceInfo():
656 logger = logging.getLogger('EucaMeta')
657 outdir = "/var/www/html/euca/"
658 outfile = outdir + "instances.txt"
663 if e.errno != errno.EEXIST:
666 dbResults = Meta.select(
667 AND(Meta.q.pri_addr != None,
668 Meta.q.state == 'running')
670 dbResults = list(dbResults)
671 f = open(outfile, "w")
673 instId = r.instance.instance_id
675 hrn = r.instance.slice.slice_hrn
676 logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
677 f.write("%s %s %s\n" % (instId, ipaddr, hrn))
682 version_manager = VersionManager()
683 ad_rspec_versions = []
684 request_rspec_versions = []
685 for rspec_version in version_manager.versions:
686 if rspec_version.content_type in ['*', 'ad']:
687 ad_rspec_versions.append(rspec_version.to_dict())
688 if rspec_version.content_type in ['*', 'request']:
689 request_rspec_versions.append(rspec_version.to_dict())
690 default_rspec_version = version_manager.get_version("sfa 1").to_dict()
692 version_more = {'interface':'aggregate',
695 'request_rspec_versions': request_rspec_versions,
696 'ad_rspec_versions': ad_rspec_versions,
697 'default_ad_rspec': default_rspec_version
699 return version_core(version_more)