1 from __future__ import with_statement
7 from multiprocessing import Process
11 from boto.ec2.regioninfo import RegionInfo
12 from boto.exception import EC2ResponseError
13 from ConfigParser import ConfigParser
14 from xmlbuilder import XMLBuilder
15 from lxml import etree as ET
16 from sqlobject import *
18 from sfa.util.faults import InvalidRSpec
19 from sfa.util.xrn import urn_to_hrn, Xrn
20 from sfa.util.plxrn import slicename_to_hrn
21 from sfa.util.callids import Callids
22 #comes with its own logging
23 #from sfa.util.sfalogging import logger
24 from sfa.util.version import version_core
26 from sfa.trust.credential import Credential
27 from sfa.planetlab.plaggregate import PlAggregate
28 from sfa.planetlab.plslices import PlSlices
29 from sfa.rspecs.version_manager import VersionManager
30 from sfa.rspecs.rspec import RSpec
33 # Meta data of an instance.
35 class Meta(SQLObject):
36 instance = SingleJoin('EucaInstance')
37 state = StringCol(default = 'new')
38 pub_addr = StringCol(default = None)
39 pri_addr = StringCol(default = None)
40 start_time = DateTimeCol(default = None)
43 # A representation of an Eucalyptus instance. This is a support class
44 # for instance <-> slice mapping.
46 class EucaInstance(SQLObject):
47 instance_id = StringCol(unique=True, default=None)
48 kernel_id = StringCol()
49 image_id = StringCol()
50 ramdisk_id = StringCol()
51 inst_type = StringCol()
52 key_pair = StringCol()
53 slice = ForeignKey('Slice')
54 meta = ForeignKey('Meta')
57 # Contacts Eucalyptus and tries to reserve this instance.
59 # @param botoConn A connection to Eucalyptus.
60 # @param pubKeys A list of public keys for the instance.
62 def reserveInstance(self, botoConn, pubKeys):
63 logger = logging.getLogger('EucaAggregate')
64 logger.info('Reserving an instance: image: %s, kernel: ' \
65 '%s, ramdisk: %s, type: %s, key: %s' % \
66 (self.image_id, self.kernel_id, self.ramdisk_id,
67 self.inst_type, self.key_pair))
70 reservation = botoConn.run_instances(self.image_id,
71 kernel_id = self.kernel_id,
72 ramdisk_id = self.ramdisk_id,
73 instance_type = self.inst_type,
74 key_name = self.key_pair,
76 for instance in reservation.instances:
77 self.instance_id = instance.id
79 # If there is an error, destroy itself.
80 except EC2ResponseError, ec2RespErr:
81 errTree = ET.fromstring(ec2RespErr.body)
82 msg = errTree.find('.//Message')
83 logger.error(msg.text)
87 # A representation of a PlanetLab slice. This is a support class
88 # for instance <-> slice mapping.
90 class Slice(SQLObject):
91 slice_hrn = StringCol()
92 #slice_index = DatabaseIndex('slice_hrn')
93 instances = MultipleJoin('EucaInstance')
96 # A class that builds the RSpec for Eucalyptus.
98 class EucaRSpecBuilder(object):
100 # Initizes a RSpec builder
102 # @param cloud A dictionary containing data about a
103 # cloud (ex. clusters, ip)
104 def __init__(self, cloud):
105 self.eucaRSpec = XMLBuilder(format = True, tab_step = " ")
106 self.cloudInfo = cloud
109 # Creates a request stanza.
111 # @param num The number of instances to create.
112 # @param image The disk image id.
113 # @param kernel The kernel image id.
114 # @param keypair Key pair to embed.
115 # @param ramdisk Ramdisk id (optional).
117 def __requestXML(self, num, image, kernel, keypair, ramdisk = ''):
122 with xml.kernel_image(id=kernel):
128 with xml.ramdisk(id=ramdisk):
130 with xml.disk_image(id=image):
136 # Creates the cluster stanza.
138 # @param clusters Clusters information.
140 def __clustersXML(self, clusters):
141 cloud = self.cloudInfo
144 for cluster in clusters:
145 instances = cluster['instances']
146 with xml.cluster(id=cluster['name']):
150 for inst in instances:
151 with xml.vm_type(name=inst[0]):
154 with xml.max_instances:
158 with xml.memory(unit='MB'):
160 with xml.disk_space(unit='GB'):
162 if 'instances' in cloud and inst[0] in cloud['instances']:
163 existingEucaInstances = cloud['instances'][inst[0]]
164 with xml.euca_instances:
165 for eucaInst in existingEucaInstances:
166 with xml.euca_instance(id=eucaInst['id']):
168 xml << eucaInst['state']
170 xml << eucaInst['public_dns']
172 def __imageBundleXML(self, bundles):
175 for bundle in bundles.keys():
176 with xml.bundle(id=bundle):
180 # Creates the Images stanza.
182 # @param images A list of images in Eucalyptus.
184 def __imagesXML(self, images):
188 with xml.image(id=image.id):
192 xml << image.architecture
196 xml << image.location
199 # Creates the KeyPairs stanza.
201 # @param keypairs A list of key pairs in Eucalyptus.
203 def __keyPairsXML(self, keypairs):
211 # Generates the RSpec.
214 logger = logging.getLogger('EucaAggregate')
215 if not self.cloudInfo:
216 logger.error('No cloud information')
220 cloud = self.cloudInfo
221 with xml.RSpec(type='eucalyptus'):
222 with xml.network(name=cloud['name']):
225 #self.__keyPairsXML(cloud['keypairs'])
226 #self.__imagesXML(cloud['images'])
227 self.__imageBundleXML(cloud['imageBundles'])
228 self.__clustersXML(cloud['clusters'])
232 # A parser to parse the output of availability-zones.
234 # Note: Only one cluster is supported. If more than one, this will
237 class ZoneResultParser(object):
238 def __init__(self, zones):
242 if len(self.zones) < 3:
248 cluster['name'] = self.zones[0].name
249 cluster['ip'] = self.zones[0].state
251 for i in range(2, len(self.zones)):
252 currZone = self.zones[i]
253 instType = currZone.name.split()[1]
255 stateString = currZone.state.split('/')
256 rscString = stateString[1].split()
258 instFree = int(stateString[0])
259 instMax = int(rscString[0])
260 instNumCpu = int(rscString[1])
261 instRam = int(rscString[2])
262 instDiskSpace = int(rscString[3])
264 instTuple = (instType, instFree, instMax, instNumCpu, instRam, instDiskSpace)
265 instList.append(instTuple)
266 cluster['instances'] = instList
267 clusterList.append(cluster)
271 class AggregateManagerEucalyptus:
273 # The data structure used to represent a cloud.
274 # It contains the cloud name, its ip address, image information,
275 # key pairs, and clusters information.
278 # The location of the RelaxNG schema.
279 EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
283 # the init_server mechanism has vanished
284 def __init__ (self, config):
285 if AggregateManagerEucalyptus._inited: return
286 AggregateManagerEucalyptus.init_server()
288 # Initialize the aggregate manager by reading a configuration file.
291 logger = logging.getLogger('EucaAggregate')
292 fileHandler = logging.FileHandler('/var/log/euca.log')
293 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
294 logger.addHandler(fileHandler)
295 fileHandler.setLevel(logging.DEBUG)
296 logger.setLevel(logging.DEBUG)
298 configParser = ConfigParser()
299 configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
300 if len(configParser.sections()) < 1:
301 logger.error('No cloud defined in the config file')
302 raise Exception('Cannot find cloud definition in configuration file.')
304 # Only read the first section.
305 cloudSec = configParser.sections()[0]
306 AggregateManagerEucalyptus.cloud['name'] = cloudSec
307 AggregateManagerEucalyptus.cloud['access_key'] = configParser.get(cloudSec, 'access_key')
308 AggregateManagerEucalyptus.cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
309 AggregateManagerEucalyptus.cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url')
310 cloudURL = AggregateManagerEucalyptus.cloud['cloud_url']
311 if cloudURL.find('https://') >= 0:
312 cloudURL = cloudURL.replace('https://', '')
313 elif cloudURL.find('http://') >= 0:
314 cloudURL = cloudURL.replace('http://', '')
315 (AggregateManagerEucalyptus.cloud['ip'], parts) = cloudURL.split(':')
317 # Create image bundles
318 images = self.getEucaConnection().get_all_images()
319 AggregateManagerEucalyptus.cloud['images'] = images
320 AggregateManagerEucalyptus.cloud['imageBundles'] = {}
322 if i.type != 'machine' or i.kernel_id is None: continue
323 name = os.path.dirname(i.location)
324 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
325 AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
327 # Initialize sqlite3 database and tables.
328 dbPath = '/etc/sfa/db'
329 dbName = 'euca_aggregate.db'
331 if not os.path.isdir(dbPath):
332 logger.info('%s not found. Creating directory ...' % dbPath)
335 conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
336 sqlhub.processConnection = conn
337 Slice.createTable(ifNotExists=True)
338 EucaInstance.createTable(ifNotExists=True)
339 Meta.createTable(ifNotExists=True)
341 # Start the update process to keep track of the meta data
342 # about Eucalyptus instance.
343 Process(target=AggregateManagerEucalyptus.updateMeta).start()
345 # Make sure the schema exists.
346 if not os.path.exists(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA):
347 err = 'Cannot location schema at %s' % AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA
352 # A separate process that will update the meta data.
356 logger = logging.getLogger('EucaMeta')
357 fileHandler = logging.FileHandler('/var/log/euca_meta.log')
358 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
359 logger.addHandler(fileHandler)
360 fileHandler.setLevel(logging.DEBUG)
361 logger.setLevel(logging.DEBUG)
366 # Get IDs of the instances that don't have IPs yet.
367 dbResults = Meta.select(
368 AND(Meta.q.pri_addr == None,
369 Meta.q.state != 'deleted')
371 dbResults = list(dbResults)
372 logger.debug('[update process] dbResults: %s' % dbResults)
377 instids.append(r.instance.instance_id)
378 logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
380 # Get instance information from Eucalyptus
381 conn = self.getEucaConnection()
383 reservations = conn.get_all_instances(instids)
384 for reservation in reservations:
385 vmInstances += reservation.instances
388 instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
389 for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
390 logger.debug('[update process] IP dict: %s' % str(instIPs))
392 # Update the local DB
393 for ipData in instIPs:
394 dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
396 logger.info('[update process] Could not find %s in DB' % ipData['id'])
398 dbInst.meta.pri_addr = ipData['pri_addr']
399 dbInst.meta.pub_addr = ipData['pub_addr']
400 dbInst.meta.state = 'running'
402 self.dumpinstanceInfo()
405 # Creates a connection to Eucalytpus. This function is inspired by
406 # the make_connection() in Euca2ools.
408 # @return A connection object or None
410 def getEucaConnection():
411 accessKey = AggregateManagerEucalyptus.cloud['access_key']
412 secretKey = AggregateManagerEucalyptus.cloud['secret_key']
413 eucaURL = AggregateManagerEucalyptus.cloud['cloud_url']
417 logger = logging.getLogger('EucaAggregate')
419 if not accessKey or not secretKey or not eucaURL:
420 logger.error('Please set ALL of the required environment ' \
421 'variables by sourcing the eucarc file.')
424 # Split the url into parts
425 if eucaURL.find('https://') >= 0:
427 eucaURL = eucaURL.replace('https://', '')
428 elif eucaURL.find('http://') >= 0:
430 eucaURL = eucaURL.replace('http://', '')
431 (eucaHost, parts) = eucaURL.split(':')
433 parts = parts.split('/')
434 eucaPort = int(parts[0])
436 srvPath = '/'.join(parts)
438 return boto.connect_ec2(aws_access_key_id=accessKey,
439 aws_secret_access_key=secretKey,
441 region=RegionInfo(None, 'eucalyptus', eucaHost),
445 def ListResources(api, creds, options):
446 call_id = options.get('call_id')
447 if Callids().already_handled(call_id): return ""
448 # get slice's hrn from options
449 xrn = options.get('geni_slice_urn', '')
450 hrn, type = urn_to_hrn(xrn)
451 logger = logging.getLogger('EucaAggregate')
453 # get hrn of the original caller
454 origin_hrn = options.get('origin_hrn', None)
456 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
458 conn = self.getEucaConnection()
461 logger.error('Cannot create a connection to Eucalyptus')
462 return 'Cannot create a connection to Eucalyptus'
466 zones = conn.get_all_zones(['verbose'])
467 p = ZoneResultParser(zones)
469 AggregateManagerEucalyptus.cloud['clusters'] = clusters
472 images = conn.get_all_images()
473 AggregateManagerEucalyptus.cloud['images'] = images
474 AggregateManagerEucalyptus.cloud['imageBundles'] = {}
476 if i.type != 'machine' or i.kernel_id is None: continue
477 name = os.path.dirname(i.location)
478 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
479 AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
482 keyPairs = conn.get_all_key_pairs()
483 AggregateManagerEucalyptus.cloud['keypairs'] = keyPairs
489 # Get the instances that belong to the given slice from sqlite3
490 # XXX use getOne() in production because the slice's hrn is supposed
491 # to be unique. For testing, uniqueness is turned off in the db.
492 # If the slice isn't found in the database, create a record for the
494 matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
496 theSlice = matchedSlices[-1]
498 theSlice = Slice(slice_hrn = hrn)
499 for instance in theSlice.instances:
500 instanceId.append(instance.instance_id)
502 # Get the information about those instances using their ids.
503 if len(instanceId) > 0:
504 reservations = conn.get_all_instances(instanceId)
507 for reservation in reservations:
508 for instance in reservation.instances:
509 instances.append(instance)
511 # Construct a dictionary for the EucaRSpecBuilder
513 for instance in instances:
514 instList = instancesDict.setdefault(instance.instance_type, [])
517 instInfoDict['id'] = instance.id
518 instInfoDict['public_dns'] = instance.public_dns_name
519 instInfoDict['state'] = instance.state
520 instInfoDict['key'] = instance.key_name
522 instList.append(instInfoDict)
523 AggregateManagerEucalyptus.cloud['instances'] = instancesDict
525 except EC2ResponseError, ec2RespErr:
526 errTree = ET.fromstring(ec2RespErr.body)
527 errMsgE = errTree.find('.//Message')
528 logger.error(errMsgE.text)
530 rspec = EucaRSpecBuilder(AggregateManagerEucalyptus.cloud).toXML()
532 # Remove the instances records so next time they won't
534 if 'instances' in AggregateManagerEucalyptus.cloud:
535 del AggregateManagerEucalyptus.cloud['instances']
540 Hook called via 'sfi.py create'
542 def CreateSliver(api, slice_xrn, creds, xml, users, options):
543 call_id = options.get('call_id')
544 if Callids().already_handled(call_id): return ""
546 logger = logging.getLogger('EucaAggregate')
547 logger.debug("In CreateSliver")
549 aggregate = PlAggregate(self.driver)
550 slices = PlSlices(self.driver)
551 (hrn, type) = urn_to_hrn(slice_xrn)
552 peer = slices.get_peer(hrn)
553 sfa_peer = slices.get_sfa_peer(hrn)
556 slice_record = users[0].get('slice_record', {})
558 conn = self.getEucaConnection()
560 logger.error('Cannot create a connection to Eucalyptus')
564 schemaXML = ET.parse(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA)
565 rspecValidator = ET.RelaxNG(schemaXML)
566 rspecXML = ET.XML(xml)
567 for network in rspecXML.iterfind("./network"):
568 if network.get('name') != AggregateManagerEucalyptus.cloud['name']:
569 # Throw away everything except my own RSpec
570 # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
571 network.getparent().remove(network)
572 if not rspecValidator(rspecXML):
573 error = rspecValidator.error_log.last_error
574 message = '%s (line %s)' % (error.message, error.line)
575 raise InvalidRSpec(message)
578 Create the sliver[s] (slice) at this aggregate.
579 Verify HRN and initialize the slice record in PLC if necessary.
582 # ensure site record exists
583 site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
584 # ensure slice record exists
585 slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
586 # ensure person records exists
587 persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
589 # Get the slice from db or create one.
590 s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
592 s = Slice(slice_hrn = hrn)
594 # Process any changes in existing instance allocation
596 for sliceInst in s.instances:
597 pendingRmInst.append(sliceInst.instance_id)
598 existingInstGroup = rspecXML.findall(".//euca_instances")
599 for instGroup in existingInstGroup:
600 for existingInst in instGroup:
601 if existingInst.get('id') in pendingRmInst:
602 pendingRmInst.remove(existingInst.get('id'))
603 for inst in pendingRmInst:
604 dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
605 if dbInst.meta.state != 'deleted':
606 logger.debug('Instance %s will be terminated' % inst)
607 # Terminate instances one at a time for robustness
608 conn.terminate_instances([inst])
609 # Only change the state but do not remove the entry from the DB.
610 dbInst.meta.state = 'deleted'
611 #dbInst.destroySelf()
613 # Process new instance requests
614 requests = rspecXML.findall(".//request")
616 # Get all the public keys associate with slice.
620 logger.debug("Keys: %s" % user['keys'])
621 pubKeys = '\n'.join(keys)
622 logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
624 vmTypeElement = req.getparent()
625 instType = vmTypeElement.get('name')
626 numInst = int(req.find('instances').text)
628 bundleName = req.find('bundle').text
629 if not AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]:
630 logger.error('Cannot find bundle %s' % bundleName)
631 bundleInfo = AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]
632 instKernel = bundleInfo['kernelID']
633 instDiskImg = bundleInfo['imageID']
634 instRamDisk = bundleInfo['ramdiskID']
637 # Create the instances
638 for i in range(0, numInst):
639 eucaInst = EucaInstance(slice = s,
640 kernel_id = instKernel,
641 image_id = instDiskImg,
642 ramdisk_id = instRamDisk,
644 inst_type = instType,
645 meta = Meta(start_time=datetime.datetime.now()))
646 eucaInst.reserveInstance(conn, pubKeys)
648 # xxx - should return altered rspec
649 # with enough data for the client to understand what's happened
653 # Return information on the IP addresses bound to each slice's instances
655 def dumpInstanceInfo():
656 logger = logging.getLogger('EucaMeta')
657 outdir = "/var/www/html/euca/"
658 outfile = outdir + "instances.txt"
663 if e.errno != errno.EEXIST:
666 dbResults = Meta.select(
667 AND(Meta.q.pri_addr != None,
668 Meta.q.state == 'running')
670 dbResults = list(dbResults)
671 f = open(outfile, "w")
673 instId = r.instance.instance_id
675 hrn = r.instance.slice.slice_hrn
676 logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
677 f.write("%s %s %s\n" % (instId, ipaddr, hrn))
680 def GetVersion(api, options):
682 version_manager = VersionManager()
683 ad_rspec_versions = []
684 request_rspec_versions = []
685 for rspec_version in version_manager.versions:
686 if rspec_version.content_type in ['*', 'ad']:
687 ad_rspec_versions.append(rspec_version.to_dict())
688 if rspec_version.content_type in ['*', 'request']:
689 request_rspec_versions.append(rspec_version.to_dict())
691 version_more = {'interface':'aggregate',
696 'geni_request_rspec_versions': request_rspec_versions,
697 'geni_ad_rspec_versions': ad_rspec_versions,
699 return version_core(version_more)