1 from __future__ import with_statement
7 from multiprocessing import Process
11 from boto.ec2.regioninfo import RegionInfo
12 from boto.exception import EC2ResponseError
13 from ConfigParser import ConfigParser
14 from xmlbuilder import XMLBuilder
15 from lxml import etree as ET
16 from sqlobject import *
18 from sfa.util.faults import InvalidRSpec
19 from sfa.util.xrn import urn_to_hrn, Xrn
20 from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn
21 from sfa.util.callids import Callids
22 #comes with its own logging
23 #from sfa.util.sfalogging import logger
24 from sfa.util.version import version_core
25 from sfa.trust.credential import Credential
26 from sfa.plc.plaggregate import PlAggregate
27 # No Slice symbol in there
28 #from sfa.plc.plslices import Slice, Slices
29 from sfa.plc.plslices import PlSlices
30 from sfa.rspecs.version_manager import VersionManager
31 from sfa.rspecs.rspec import RSpec
32 # not sure what this used to be nor where it is now defined
33 #from sfa.rspecs.sfa_rspec import sfa_rspec_version
34 # most likely this should now be
35 #from sfa.rspecs.version_manager import VersionManager
38 # Meta data of an instance.
40 class Meta(SQLObject):
41 instance = SingleJoin('EucaInstance')
42 state = StringCol(default = 'new')
43 pub_addr = StringCol(default = None)
44 pri_addr = StringCol(default = None)
45 start_time = DateTimeCol(default = None)
48 # A representation of an Eucalyptus instance. This is a support class
49 # for instance <-> slice mapping.
51 class EucaInstance(SQLObject):
52 instance_id = StringCol(unique=True, default=None)
53 kernel_id = StringCol()
54 image_id = StringCol()
55 ramdisk_id = StringCol()
56 inst_type = StringCol()
57 key_pair = StringCol()
58 slice = ForeignKey('Slice')
59 meta = ForeignKey('Meta')
62 # Contacts Eucalyptus and tries to reserve this instance.
64 # @param botoConn A connection to Eucalyptus.
65 # @param pubKeys A list of public keys for the instance.
67 def reserveInstance(self, botoConn, pubKeys):
68 logger = logging.getLogger('EucaAggregate')
69 logger.info('Reserving an instance: image: %s, kernel: ' \
70 '%s, ramdisk: %s, type: %s, key: %s' % \
71 (self.image_id, self.kernel_id, self.ramdisk_id,
72 self.inst_type, self.key_pair))
75 reservation = botoConn.run_instances(self.image_id,
76 kernel_id = self.kernel_id,
77 ramdisk_id = self.ramdisk_id,
78 instance_type = self.inst_type,
79 key_name = self.key_pair,
81 for instance in reservation.instances:
82 self.instance_id = instance.id
84 # If there is an error, destroy itself.
85 except EC2ResponseError, ec2RespErr:
86 errTree = ET.fromstring(ec2RespErr.body)
87 msg = errTree.find('.//Message')
88 logger.error(msg.text)
92 # A representation of a PlanetLab slice. This is a support class
93 # for instance <-> slice mapping.
95 class Slice(SQLObject):
96 slice_hrn = StringCol()
97 #slice_index = DatabaseIndex('slice_hrn')
98 instances = MultipleJoin('EucaInstance')
101 # A class that builds the RSpec for Eucalyptus.
103 class EucaRSpecBuilder(object):
105 # Initizes a RSpec builder
107 # @param cloud A dictionary containing data about a
108 # cloud (ex. clusters, ip)
109 def __init__(self, cloud):
110 self.eucaRSpec = XMLBuilder(format = True, tab_step = " ")
111 self.cloudInfo = cloud
114 # Creates a request stanza.
116 # @param num The number of instances to create.
117 # @param image The disk image id.
118 # @param kernel The kernel image id.
119 # @param keypair Key pair to embed.
120 # @param ramdisk Ramdisk id (optional).
122 def __requestXML(self, num, image, kernel, keypair, ramdisk = ''):
127 with xml.kernel_image(id=kernel):
133 with xml.ramdisk(id=ramdisk):
135 with xml.disk_image(id=image):
141 # Creates the cluster stanza.
143 # @param clusters Clusters information.
145 def __clustersXML(self, clusters):
146 cloud = self.cloudInfo
149 for cluster in clusters:
150 instances = cluster['instances']
151 with xml.cluster(id=cluster['name']):
155 for inst in instances:
156 with xml.vm_type(name=inst[0]):
159 with xml.max_instances:
163 with xml.memory(unit='MB'):
165 with xml.disk_space(unit='GB'):
167 if 'instances' in cloud and inst[0] in cloud['instances']:
168 existingEucaInstances = cloud['instances'][inst[0]]
169 with xml.euca_instances:
170 for eucaInst in existingEucaInstances:
171 with xml.euca_instance(id=eucaInst['id']):
173 xml << eucaInst['state']
175 xml << eucaInst['public_dns']
177 def __imageBundleXML(self, bundles):
180 for bundle in bundles.keys():
181 with xml.bundle(id=bundle):
185 # Creates the Images stanza.
187 # @param images A list of images in Eucalyptus.
189 def __imagesXML(self, images):
193 with xml.image(id=image.id):
197 xml << image.architecture
201 xml << image.location
204 # Creates the KeyPairs stanza.
206 # @param keypairs A list of key pairs in Eucalyptus.
208 def __keyPairsXML(self, keypairs):
216 # Generates the RSpec.
219 logger = logging.getLogger('EucaAggregate')
220 if not self.cloudInfo:
221 logger.error('No cloud information')
225 cloud = self.cloudInfo
226 with xml.RSpec(type='eucalyptus'):
227 with xml.network(name=cloud['name']):
230 #self.__keyPairsXML(cloud['keypairs'])
231 #self.__imagesXML(cloud['images'])
232 self.__imageBundleXML(cloud['imageBundles'])
233 self.__clustersXML(cloud['clusters'])
237 # A parser to parse the output of availability-zones.
239 # Note: Only one cluster is supported. If more than one, this will
242 class ZoneResultParser(object):
243 def __init__(self, zones):
247 if len(self.zones) < 3:
253 cluster['name'] = self.zones[0].name
254 cluster['ip'] = self.zones[0].state
256 for i in range(2, len(self.zones)):
257 currZone = self.zones[i]
258 instType = currZone.name.split()[1]
260 stateString = currZone.state.split('/')
261 rscString = stateString[1].split()
263 instFree = int(stateString[0])
264 instMax = int(rscString[0])
265 instNumCpu = int(rscString[1])
266 instRam = int(rscString[2])
267 instDiskSpace = int(rscString[3])
269 instTuple = (instType, instFree, instMax, instNumCpu, instRam, instDiskSpace)
270 instList.append(instTuple)
271 cluster['instances'] = instList
272 clusterList.append(cluster)
276 class AggregateManagerEucalyptus:
278 # The data structure used to represent a cloud.
279 # It contains the cloud name, its ip address, image information,
280 # key pairs, and clusters information.
283 # The location of the RelaxNG schema.
284 EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
288 # the init_server mechanism has vanished
289 def __init__ (self, config):
290 if AggregateManagerEucalyptus._inited: return
291 AggregateManagerEucalyptus.init_server()
293 # Initialize the aggregate manager by reading a configuration file.
296 logger = logging.getLogger('EucaAggregate')
297 fileHandler = logging.FileHandler('/var/log/euca.log')
298 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
299 logger.addHandler(fileHandler)
300 fileHandler.setLevel(logging.DEBUG)
301 logger.setLevel(logging.DEBUG)
303 configParser = ConfigParser()
304 configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
305 if len(configParser.sections()) < 1:
306 logger.error('No cloud defined in the config file')
307 raise Exception('Cannot find cloud definition in configuration file.')
309 # Only read the first section.
310 cloudSec = configParser.sections()[0]
311 AggregateManagerEucalyptus.cloud['name'] = cloudSec
312 AggregateManagerEucalyptus.cloud['access_key'] = configParser.get(cloudSec, 'access_key')
313 AggregateManagerEucalyptus.cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
314 AggregateManagerEucalyptus.cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url')
315 cloudURL = AggregateManagerEucalyptus.cloud['cloud_url']
316 if cloudURL.find('https://') >= 0:
317 cloudURL = cloudURL.replace('https://', '')
318 elif cloudURL.find('http://') >= 0:
319 cloudURL = cloudURL.replace('http://', '')
320 (AggregateManagerEucalyptus.cloud['ip'], parts) = cloudURL.split(':')
322 # Create image bundles
323 images = self.getEucaConnection().get_all_images()
324 AggregateManagerEucalyptus.cloud['images'] = images
325 AggregateManagerEucalyptus.cloud['imageBundles'] = {}
327 if i.type != 'machine' or i.kernel_id is None: continue
328 name = os.path.dirname(i.location)
329 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
330 AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
332 # Initialize sqlite3 database and tables.
333 dbPath = '/etc/sfa/db'
334 dbName = 'euca_aggregate.db'
336 if not os.path.isdir(dbPath):
337 logger.info('%s not found. Creating directory ...' % dbPath)
340 conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
341 sqlhub.processConnection = conn
342 Slice.createTable(ifNotExists=True)
343 EucaInstance.createTable(ifNotExists=True)
344 Meta.createTable(ifNotExists=True)
346 # Start the update process to keep track of the meta data
347 # about Eucalyptus instance.
348 Process(target=AggregateManagerEucalyptus.updateMeta).start()
350 # Make sure the schema exists.
351 if not os.path.exists(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA):
352 err = 'Cannot location schema at %s' % AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA
357 # A separate process that will update the meta data.
361 logger = logging.getLogger('EucaMeta')
362 fileHandler = logging.FileHandler('/var/log/euca_meta.log')
363 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
364 logger.addHandler(fileHandler)
365 fileHandler.setLevel(logging.DEBUG)
366 logger.setLevel(logging.DEBUG)
371 # Get IDs of the instances that don't have IPs yet.
372 dbResults = Meta.select(
373 AND(Meta.q.pri_addr == None,
374 Meta.q.state != 'deleted')
376 dbResults = list(dbResults)
377 logger.debug('[update process] dbResults: %s' % dbResults)
382 instids.append(r.instance.instance_id)
383 logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
385 # Get instance information from Eucalyptus
386 conn = self.getEucaConnection()
388 reservations = conn.get_all_instances(instids)
389 for reservation in reservations:
390 vmInstances += reservation.instances
393 instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
394 for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
395 logger.debug('[update process] IP dict: %s' % str(instIPs))
397 # Update the local DB
398 for ipData in instIPs:
399 dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
401 logger.info('[update process] Could not find %s in DB' % ipData['id'])
403 dbInst.meta.pri_addr = ipData['pri_addr']
404 dbInst.meta.pub_addr = ipData['pub_addr']
405 dbInst.meta.state = 'running'
407 self.dumpinstanceInfo()
410 # Creates a connection to Eucalytpus. This function is inspired by
411 # the make_connection() in Euca2ools.
413 # @return A connection object or None
415 def getEucaConnection():
416 accessKey = AggregateManagerEucalyptus.cloud['access_key']
417 secretKey = AggregateManagerEucalyptus.cloud['secret_key']
418 eucaURL = AggregateManagerEucalyptus.cloud['cloud_url']
422 logger = logging.getLogger('EucaAggregate')
424 if not accessKey or not secretKey or not eucaURL:
425 logger.error('Please set ALL of the required environment ' \
426 'variables by sourcing the eucarc file.')
429 # Split the url into parts
430 if eucaURL.find('https://') >= 0:
432 eucaURL = eucaURL.replace('https://', '')
433 elif eucaURL.find('http://') >= 0:
435 eucaURL = eucaURL.replace('http://', '')
436 (eucaHost, parts) = eucaURL.split(':')
438 parts = parts.split('/')
439 eucaPort = int(parts[0])
441 srvPath = '/'.join(parts)
443 return boto.connect_ec2(aws_access_key_id=accessKey,
444 aws_secret_access_key=secretKey,
446 region=RegionInfo(None, 'eucalyptus', eucaHost),
450 def ListResources(api, creds, options):
451 call_id = options.get('call_id')
452 if Callids().already_handled(call_id): return ""
453 # get slice's hrn from options
454 xrn = options.get('geni_slice_urn', '')
455 hrn, type = urn_to_hrn(xrn)
456 logger = logging.getLogger('EucaAggregate')
458 # get hrn of the original caller
459 origin_hrn = options.get('origin_hrn', None)
461 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
463 conn = self.getEucaConnection()
466 logger.error('Cannot create a connection to Eucalyptus')
467 return 'Cannot create a connection to Eucalyptus'
471 zones = conn.get_all_zones(['verbose'])
472 p = ZoneResultParser(zones)
474 AggregateManagerEucalyptus.cloud['clusters'] = clusters
477 images = conn.get_all_images()
478 AggregateManagerEucalyptus.cloud['images'] = images
479 AggregateManagerEucalyptus.cloud['imageBundles'] = {}
481 if i.type != 'machine' or i.kernel_id is None: continue
482 name = os.path.dirname(i.location)
483 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
484 AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
487 keyPairs = conn.get_all_key_pairs()
488 AggregateManagerEucalyptus.cloud['keypairs'] = keyPairs
494 # Get the instances that belong to the given slice from sqlite3
495 # XXX use getOne() in production because the slice's hrn is supposed
496 # to be unique. For testing, uniqueness is turned off in the db.
497 # If the slice isn't found in the database, create a record for the
499 matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
501 theSlice = matchedSlices[-1]
503 theSlice = Slice(slice_hrn = hrn)
504 for instance in theSlice.instances:
505 instanceId.append(instance.instance_id)
507 # Get the information about those instances using their ids.
508 if len(instanceId) > 0:
509 reservations = conn.get_all_instances(instanceId)
512 for reservation in reservations:
513 for instance in reservation.instances:
514 instances.append(instance)
516 # Construct a dictionary for the EucaRSpecBuilder
518 for instance in instances:
519 instList = instancesDict.setdefault(instance.instance_type, [])
522 instInfoDict['id'] = instance.id
523 instInfoDict['public_dns'] = instance.public_dns_name
524 instInfoDict['state'] = instance.state
525 instInfoDict['key'] = instance.key_name
527 instList.append(instInfoDict)
528 AggregateManagerEucalyptus.cloud['instances'] = instancesDict
530 except EC2ResponseError, ec2RespErr:
531 errTree = ET.fromstring(ec2RespErr.body)
532 errMsgE = errTree.find('.//Message')
533 logger.error(errMsgE.text)
535 rspec = EucaRSpecBuilder(AggregateManagerEucalyptus.cloud).toXML()
537 # Remove the instances records so next time they won't
539 if 'instances' in AggregateManagerEucalyptus.cloud:
540 del AggregateManagerEucalyptus.cloud['instances']
545 Hook called via 'sfi.py create'
547 def CreateSliver(api, slice_xrn, creds, xml, users, options):
548 call_id = options.get('call_id')
549 if Callids().already_handled(call_id): return ""
551 logger = logging.getLogger('EucaAggregate')
552 logger.debug("In CreateSliver")
554 aggregate = PlAggregate(self.driver)
555 slices = PlSlices(self.driver)
556 (hrn, type) = urn_to_hrn(slice_xrn)
557 peer = slices.get_peer(hrn)
558 sfa_peer = slices.get_sfa_peer(hrn)
561 slice_record = users[0].get('slice_record', {})
563 conn = self.getEucaConnection()
565 logger.error('Cannot create a connection to Eucalyptus')
569 schemaXML = ET.parse(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA)
570 rspecValidator = ET.RelaxNG(schemaXML)
571 rspecXML = ET.XML(xml)
572 for network in rspecXML.iterfind("./network"):
573 if network.get('name') != AggregateManagerEucalyptus.cloud['name']:
574 # Throw away everything except my own RSpec
575 # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
576 network.getparent().remove(network)
577 if not rspecValidator(rspecXML):
578 error = rspecValidator.error_log.last_error
579 message = '%s (line %s)' % (error.message, error.line)
580 raise InvalidRSpec(message)
583 Create the sliver[s] (slice) at this aggregate.
584 Verify HRN and initialize the slice record in PLC if necessary.
587 # ensure site record exists
588 site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
589 # ensure slice record exists
590 slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
591 # ensure person records exists
592 persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
594 # Get the slice from db or create one.
595 s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
597 s = Slice(slice_hrn = hrn)
599 # Process any changes in existing instance allocation
601 for sliceInst in s.instances:
602 pendingRmInst.append(sliceInst.instance_id)
603 existingInstGroup = rspecXML.findall(".//euca_instances")
604 for instGroup in existingInstGroup:
605 for existingInst in instGroup:
606 if existingInst.get('id') in pendingRmInst:
607 pendingRmInst.remove(existingInst.get('id'))
608 for inst in pendingRmInst:
609 dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
610 if dbInst.meta.state != 'deleted':
611 logger.debug('Instance %s will be terminated' % inst)
612 # Terminate instances one at a time for robustness
613 conn.terminate_instances([inst])
614 # Only change the state but do not remove the entry from the DB.
615 dbInst.meta.state = 'deleted'
616 #dbInst.destroySelf()
618 # Process new instance requests
619 requests = rspecXML.findall(".//request")
621 # Get all the public keys associate with slice.
625 logger.debug("Keys: %s" % user['keys'])
626 pubKeys = '\n'.join(keys)
627 logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
629 vmTypeElement = req.getparent()
630 instType = vmTypeElement.get('name')
631 numInst = int(req.find('instances').text)
633 bundleName = req.find('bundle').text
634 if not AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]:
635 logger.error('Cannot find bundle %s' % bundleName)
636 bundleInfo = AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]
637 instKernel = bundleInfo['kernelID']
638 instDiskImg = bundleInfo['imageID']
639 instRamDisk = bundleInfo['ramdiskID']
642 # Create the instances
643 for i in range(0, numInst):
644 eucaInst = EucaInstance(slice = s,
645 kernel_id = instKernel,
646 image_id = instDiskImg,
647 ramdisk_id = instRamDisk,
649 inst_type = instType,
650 meta = Meta(start_time=datetime.datetime.now()))
651 eucaInst.reserveInstance(conn, pubKeys)
653 # xxx - should return altered rspec
654 # with enough data for the client to understand what's happened
658 # Return information on the IP addresses bound to each slice's instances
660 def dumpInstanceInfo():
661 logger = logging.getLogger('EucaMeta')
662 outdir = "/var/www/html/euca/"
663 outfile = outdir + "instances.txt"
668 if e.errno != errno.EEXIST:
671 dbResults = Meta.select(
672 AND(Meta.q.pri_addr != None,
673 Meta.q.state == 'running')
675 dbResults = list(dbResults)
676 f = open(outfile, "w")
678 instId = r.instance.instance_id
680 hrn = r.instance.slice.slice_hrn
681 logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
682 f.write("%s %s %s\n" % (instId, ipaddr, hrn))
685 def GetVersion(api, options):
687 version_manager = VersionManager()
688 ad_rspec_versions = []
689 request_rspec_versions = []
690 for rspec_version in version_manager.versions:
691 if rspec_version.content_type in ['*', 'ad']:
692 ad_rspec_versions.append(rspec_version.to_dict())
693 if rspec_version.content_type in ['*', 'request']:
694 request_rspec_versions.append(rspec_version.to_dict())
696 version_more = {'interface':'aggregate',
701 'geni_request_rspec_versions': request_rspec_versions,
702 'geni_ad_rspec_versions': ad_rspec_versions,
704 return version_core(version_more)