1 from __future__ import with_statement
7 from multiprocessing import Process
11 from boto.ec2.regioninfo import RegionInfo
12 from boto.exception import EC2ResponseError
13 from ConfigParser import ConfigParser
14 from xmlbuilder import XMLBuilder
15 from lxml import etree as ET
16 from sqlobject import *
18 from sfa.util.faults import InvalidRSpec
19 from sfa.util.xrn import urn_to_hrn, Xrn
20 from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn
21 from sfa.util.callids import Callids
22 #comes with its own logging
23 #from sfa.util.sfalogging import logger
24 from sfa.util.version import version_core
25 from sfa.trust.credential import Credential
26 from sfa.plc.aggregate import Aggregate
27 from sfa.plc.slices import Slice, Slices
28 from sfa.rspecs.version_manager import VersionManager
29 from sfa.rspecs.rspec import RSpec
30 # not sure what this used to be nor where it is now defined
31 #from sfa.rspecs.sfa_rspec import sfa_rspec_version
32 # most likely this should now be
33 #from sfa.rspecs.version_manager import VersionManager
36 # Meta data of an instance.
38 class Meta(SQLObject):
39 instance = SingleJoin('EucaInstance')
40 state = StringCol(default = 'new')
41 pub_addr = StringCol(default = None)
42 pri_addr = StringCol(default = None)
43 start_time = DateTimeCol(default = None)
46 # A representation of an Eucalyptus instance. This is a support class
47 # for instance <-> slice mapping.
49 class EucaInstance(SQLObject):
50 instance_id = StringCol(unique=True, default=None)
51 kernel_id = StringCol()
52 image_id = StringCol()
53 ramdisk_id = StringCol()
54 inst_type = StringCol()
55 key_pair = StringCol()
56 slice = ForeignKey('Slice')
57 meta = ForeignKey('Meta')
60 # Contacts Eucalyptus and tries to reserve this instance.
62 # @param botoConn A connection to Eucalyptus.
63 # @param pubKeys A list of public keys for the instance.
65 def reserveInstance(self, botoConn, pubKeys):
66 logger = logging.getLogger('EucaAggregate')
67 logger.info('Reserving an instance: image: %s, kernel: ' \
68 '%s, ramdisk: %s, type: %s, key: %s' % \
69 (self.image_id, self.kernel_id, self.ramdisk_id,
70 self.inst_type, self.key_pair))
73 reservation = botoConn.run_instances(self.image_id,
74 kernel_id = self.kernel_id,
75 ramdisk_id = self.ramdisk_id,
76 instance_type = self.inst_type,
77 key_name = self.key_pair,
79 for instance in reservation.instances:
80 self.instance_id = instance.id
82 # If there is an error, destroy itself.
83 except EC2ResponseError, ec2RespErr:
84 errTree = ET.fromstring(ec2RespErr.body)
85 msg = errTree.find('.//Message')
86 logger.error(msg.text)
90 # A representation of a PlanetLab slice. This is a support class
91 # for instance <-> slice mapping.
93 class Slice(SQLObject):
94 slice_hrn = StringCol()
95 #slice_index = DatabaseIndex('slice_hrn')
96 instances = MultipleJoin('EucaInstance')
99 # A class that builds the RSpec for Eucalyptus.
101 class EucaRSpecBuilder(object):
103 # Initizes a RSpec builder
105 # @param cloud A dictionary containing data about a
106 # cloud (ex. clusters, ip)
107 def __init__(self, cloud):
108 self.eucaRSpec = XMLBuilder(format = True, tab_step = " ")
109 self.cloudInfo = cloud
112 # Creates a request stanza.
114 # @param num The number of instances to create.
115 # @param image The disk image id.
116 # @param kernel The kernel image id.
117 # @param keypair Key pair to embed.
118 # @param ramdisk Ramdisk id (optional).
120 def __requestXML(self, num, image, kernel, keypair, ramdisk = ''):
125 with xml.kernel_image(id=kernel):
131 with xml.ramdisk(id=ramdisk):
133 with xml.disk_image(id=image):
139 # Creates the cluster stanza.
141 # @param clusters Clusters information.
143 def __clustersXML(self, clusters):
144 cloud = self.cloudInfo
147 for cluster in clusters:
148 instances = cluster['instances']
149 with xml.cluster(id=cluster['name']):
153 for inst in instances:
154 with xml.vm_type(name=inst[0]):
157 with xml.max_instances:
161 with xml.memory(unit='MB'):
163 with xml.disk_space(unit='GB'):
165 if 'instances' in cloud and inst[0] in cloud['instances']:
166 existingEucaInstances = cloud['instances'][inst[0]]
167 with xml.euca_instances:
168 for eucaInst in existingEucaInstances:
169 with xml.euca_instance(id=eucaInst['id']):
171 xml << eucaInst['state']
173 xml << eucaInst['public_dns']
175 def __imageBundleXML(self, bundles):
178 for bundle in bundles.keys():
179 with xml.bundle(id=bundle):
183 # Creates the Images stanza.
185 # @param images A list of images in Eucalyptus.
187 def __imagesXML(self, images):
191 with xml.image(id=image.id):
195 xml << image.architecture
199 xml << image.location
202 # Creates the KeyPairs stanza.
204 # @param keypairs A list of key pairs in Eucalyptus.
206 def __keyPairsXML(self, keypairs):
214 # Generates the RSpec.
217 logger = logging.getLogger('EucaAggregate')
218 if not self.cloudInfo:
219 logger.error('No cloud information')
223 cloud = self.cloudInfo
224 with xml.RSpec(type='eucalyptus'):
225 with xml.network(name=cloud['name']):
228 #self.__keyPairsXML(cloud['keypairs'])
229 #self.__imagesXML(cloud['images'])
230 self.__imageBundleXML(cloud['imageBundles'])
231 self.__clustersXML(cloud['clusters'])
235 # A parser to parse the output of availability-zones.
237 # Note: Only one cluster is supported. If more than one, this will
240 class ZoneResultParser(object):
241 def __init__(self, zones):
245 if len(self.zones) < 3:
251 cluster['name'] = self.zones[0].name
252 cluster['ip'] = self.zones[0].state
254 for i in range(2, len(self.zones)):
255 currZone = self.zones[i]
256 instType = currZone.name.split()[1]
258 stateString = currZone.state.split('/')
259 rscString = stateString[1].split()
261 instFree = int(stateString[0])
262 instMax = int(rscString[0])
263 instNumCpu = int(rscString[1])
264 instRam = int(rscString[2])
265 instDiskSpace = int(rscString[3])
267 instTuple = (instType, instFree, instMax, instNumCpu, instRam, instDiskSpace)
268 instList.append(instTuple)
269 cluster['instances'] = instList
270 clusterList.append(cluster)
274 class AggregateManagerEucalyptus:
276 # The data structure used to represent a cloud.
277 # It contains the cloud name, its ip address, image information,
278 # key pairs, and clusters information.
281 # The location of the RelaxNG schema.
282 EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
286 # the init_server mechanism has vanished
288 if AggregateManagerEucalyptus._inited: return
289 AggregateManagerEucalyptus.init_server()
291 # Initialize the aggregate manager by reading a configuration file.
294 logger = logging.getLogger('EucaAggregate')
295 fileHandler = logging.FileHandler('/var/log/euca.log')
296 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
297 logger.addHandler(fileHandler)
298 fileHandler.setLevel(logging.DEBUG)
299 logger.setLevel(logging.DEBUG)
301 configParser = ConfigParser()
302 configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
303 if len(configParser.sections()) < 1:
304 logger.error('No cloud defined in the config file')
305 raise Exception('Cannot find cloud definition in configuration file.')
307 # Only read the first section.
308 cloudSec = configParser.sections()[0]
309 AggregateManagerEucalyptus.cloud['name'] = cloudSec
310 AggregateManagerEucalyptus.cloud['access_key'] = configParser.get(cloudSec, 'access_key')
311 AggregateManagerEucalyptus.cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
312 AggregateManagerEucalyptus.cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url')
313 cloudURL = AggregateManagerEucalyptus.cloud['cloud_url']
314 if cloudURL.find('https://') >= 0:
315 cloudURL = cloudURL.replace('https://', '')
316 elif cloudURL.find('http://') >= 0:
317 cloudURL = cloudURL.replace('http://', '')
318 (AggregateManagerEucalyptus.cloud['ip'], parts) = cloudURL.split(':')
320 # Create image bundles
321 images = self.getEucaConnection().get_all_images()
322 AggregateManagerEucalyptus.cloud['images'] = images
323 AggregateManagerEucalyptus.cloud['imageBundles'] = {}
325 if i.type != 'machine' or i.kernel_id is None: continue
326 name = os.path.dirname(i.location)
327 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
328 AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
330 # Initialize sqlite3 database and tables.
331 dbPath = '/etc/sfa/db'
332 dbName = 'euca_aggregate.db'
334 if not os.path.isdir(dbPath):
335 logger.info('%s not found. Creating directory ...' % dbPath)
338 conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
339 sqlhub.processConnection = conn
340 Slice.createTable(ifNotExists=True)
341 EucaInstance.createTable(ifNotExists=True)
342 Meta.createTable(ifNotExists=True)
344 # Start the update process to keep track of the meta data
345 # about Eucalyptus instance.
346 Process(target=AggregateManagerEucalyptus.updateMeta).start()
348 # Make sure the schema exists.
349 if not os.path.exists(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA):
350 err = 'Cannot location schema at %s' % AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA
355 # A separate process that will update the meta data.
359 logger = logging.getLogger('EucaMeta')
360 fileHandler = logging.FileHandler('/var/log/euca_meta.log')
361 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
362 logger.addHandler(fileHandler)
363 fileHandler.setLevel(logging.DEBUG)
364 logger.setLevel(logging.DEBUG)
369 # Get IDs of the instances that don't have IPs yet.
370 dbResults = Meta.select(
371 AND(Meta.q.pri_addr == None,
372 Meta.q.state != 'deleted')
374 dbResults = list(dbResults)
375 logger.debug('[update process] dbResults: %s' % dbResults)
380 instids.append(r.instance.instance_id)
381 logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
383 # Get instance information from Eucalyptus
384 conn = self.getEucaConnection()
386 reservations = conn.get_all_instances(instids)
387 for reservation in reservations:
388 vmInstances += reservation.instances
391 instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
392 for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
393 logger.debug('[update process] IP dict: %s' % str(instIPs))
395 # Update the local DB
396 for ipData in instIPs:
397 dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
399 logger.info('[update process] Could not find %s in DB' % ipData['id'])
401 dbInst.meta.pri_addr = ipData['pri_addr']
402 dbInst.meta.pub_addr = ipData['pub_addr']
403 dbInst.meta.state = 'running'
405 self.dumpinstanceInfo()
408 # Creates a connection to Eucalytpus. This function is inspired by
409 # the make_connection() in Euca2ools.
411 # @return A connection object or None
413 def getEucaConnection():
414 accessKey = AggregateManagerEucalyptus.cloud['access_key']
415 secretKey = AggregateManagerEucalyptus.cloud['secret_key']
416 eucaURL = AggregateManagerEucalyptus.cloud['cloud_url']
420 logger = logging.getLogger('EucaAggregate')
422 if not accessKey or not secretKey or not eucaURL:
423 logger.error('Please set ALL of the required environment ' \
424 'variables by sourcing the eucarc file.')
427 # Split the url into parts
428 if eucaURL.find('https://') >= 0:
430 eucaURL = eucaURL.replace('https://', '')
431 elif eucaURL.find('http://') >= 0:
433 eucaURL = eucaURL.replace('http://', '')
434 (eucaHost, parts) = eucaURL.split(':')
436 parts = parts.split('/')
437 eucaPort = int(parts[0])
439 srvPath = '/'.join(parts)
441 return boto.connect_ec2(aws_access_key_id=accessKey,
442 aws_secret_access_key=secretKey,
444 region=RegionInfo(None, 'eucalyptus', eucaHost),
448 def ListResources(api, creds, options={}):
449 call_id = options.get('call_id')
450 if Callids().already_handled(call_id): return ""
451 # get slice's hrn from options
452 xrn = options.get('geni_slice_urn', '')
453 hrn, type = urn_to_hrn(xrn)
454 logger = logging.getLogger('EucaAggregate')
456 # get hrn of the original caller
457 origin_hrn = options.get('origin_hrn', None)
459 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
461 conn = self.getEucaConnection()
464 logger.error('Cannot create a connection to Eucalyptus')
465 return 'Cannot create a connection to Eucalyptus'
469 zones = conn.get_all_zones(['verbose'])
470 p = ZoneResultParser(zones)
472 AggregateManagerEucalyptus.cloud['clusters'] = clusters
475 images = conn.get_all_images()
476 AggregateManagerEucalyptus.cloud['images'] = images
477 AggregateManagerEucalyptus.cloud['imageBundles'] = {}
479 if i.type != 'machine' or i.kernel_id is None: continue
480 name = os.path.dirname(i.location)
481 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
482 AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
485 keyPairs = conn.get_all_key_pairs()
486 AggregateManagerEucalyptus.cloud['keypairs'] = keyPairs
492 # Get the instances that belong to the given slice from sqlite3
493 # XXX use getOne() in production because the slice's hrn is supposed
494 # to be unique. For testing, uniqueness is turned off in the db.
495 # If the slice isn't found in the database, create a record for the
497 matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
499 theSlice = matchedSlices[-1]
501 theSlice = Slice(slice_hrn = hrn)
502 for instance in theSlice.instances:
503 instanceId.append(instance.instance_id)
505 # Get the information about those instances using their ids.
506 if len(instanceId) > 0:
507 reservations = conn.get_all_instances(instanceId)
510 for reservation in reservations:
511 for instance in reservation.instances:
512 instances.append(instance)
514 # Construct a dictionary for the EucaRSpecBuilder
516 for instance in instances:
517 instList = instancesDict.setdefault(instance.instance_type, [])
520 instInfoDict['id'] = instance.id
521 instInfoDict['public_dns'] = instance.public_dns_name
522 instInfoDict['state'] = instance.state
523 instInfoDict['key'] = instance.key_name
525 instList.append(instInfoDict)
526 AggregateManagerEucalyptus.cloud['instances'] = instancesDict
528 except EC2ResponseError, ec2RespErr:
529 errTree = ET.fromstring(ec2RespErr.body)
530 errMsgE = errTree.find('.//Message')
531 logger.error(errMsgE.text)
533 rspec = EucaRSpecBuilder(AggregateManagerEucalyptus.cloud).toXML()
535 # Remove the instances records so next time they won't
537 if 'instances' in AggregateManagerEucalyptus.cloud:
538 del AggregateManagerEucalyptus.cloud['instances']
543 Hook called via 'sfi.py create'
545 def CreateSliver(api, slice_xrn, creds, xml, users, options={}):
546 call_id = options.get('call_id')
547 if Callids().already_handled(call_id): return ""
549 logger = logging.getLogger('EucaAggregate')
550 logger.debug("In CreateSliver")
552 aggregate = Aggregate(self.driver)
554 (hrn, type) = urn_to_hrn(slice_xrn)
555 peer = slices.get_peer(hrn)
556 sfa_peer = slices.get_sfa_peer(hrn)
559 slice_record = users[0].get('slice_record', {})
561 conn = self.getEucaConnection()
563 logger.error('Cannot create a connection to Eucalyptus')
567 schemaXML = ET.parse(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA)
568 rspecValidator = ET.RelaxNG(schemaXML)
569 rspecXML = ET.XML(xml)
570 for network in rspecXML.iterfind("./network"):
571 if network.get('name') != AggregateManagerEucalyptus.cloud['name']:
572 # Throw away everything except my own RSpec
573 # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
574 network.getparent().remove(network)
575 if not rspecValidator(rspecXML):
576 error = rspecValidator.error_log.last_error
577 message = '%s (line %s)' % (error.message, error.line)
578 raise InvalidRSpec(message)
581 Create the sliver[s] (slice) at this aggregate.
582 Verify HRN and initialize the slice record in PLC if necessary.
585 # ensure site record exists
586 site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
587 # ensure slice record exists
588 slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
589 # ensure person records exists
590 persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
592 # Get the slice from db or create one.
593 s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
595 s = Slice(slice_hrn = hrn)
597 # Process any changes in existing instance allocation
599 for sliceInst in s.instances:
600 pendingRmInst.append(sliceInst.instance_id)
601 existingInstGroup = rspecXML.findall(".//euca_instances")
602 for instGroup in existingInstGroup:
603 for existingInst in instGroup:
604 if existingInst.get('id') in pendingRmInst:
605 pendingRmInst.remove(existingInst.get('id'))
606 for inst in pendingRmInst:
607 dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
608 if dbInst.meta.state != 'deleted':
609 logger.debug('Instance %s will be terminated' % inst)
610 # Terminate instances one at a time for robustness
611 conn.terminate_instances([inst])
612 # Only change the state but do not remove the entry from the DB.
613 dbInst.meta.state = 'deleted'
614 #dbInst.destroySelf()
616 # Process new instance requests
617 requests = rspecXML.findall(".//request")
619 # Get all the public keys associate with slice.
623 logger.debug("Keys: %s" % user['keys'])
624 pubKeys = '\n'.join(keys)
625 logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
627 vmTypeElement = req.getparent()
628 instType = vmTypeElement.get('name')
629 numInst = int(req.find('instances').text)
631 bundleName = req.find('bundle').text
632 if not AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]:
633 logger.error('Cannot find bundle %s' % bundleName)
634 bundleInfo = AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]
635 instKernel = bundleInfo['kernelID']
636 instDiskImg = bundleInfo['imageID']
637 instRamDisk = bundleInfo['ramdiskID']
640 # Create the instances
641 for i in range(0, numInst):
642 eucaInst = EucaInstance(slice = s,
643 kernel_id = instKernel,
644 image_id = instDiskImg,
645 ramdisk_id = instRamDisk,
647 inst_type = instType,
648 meta = Meta(start_time=datetime.datetime.now()))
649 eucaInst.reserveInstance(conn, pubKeys)
651 # xxx - should return altered rspec
652 # with enough data for the client to understand what's happened
656 # Return information on the IP addresses bound to each slice's instances
658 def dumpInstanceInfo():
659 logger = logging.getLogger('EucaMeta')
660 outdir = "/var/www/html/euca/"
661 outfile = outdir + "instances.txt"
666 if e.errno != errno.EEXIST:
669 dbResults = Meta.select(
670 AND(Meta.q.pri_addr != None,
671 Meta.q.state == 'running')
673 dbResults = list(dbResults)
674 f = open(outfile, "w")
676 instId = r.instance.instance_id
678 hrn = r.instance.slice.slice_hrn
679 logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
680 f.write("%s %s %s\n" % (instId, ipaddr, hrn))
683 def GetVersion(api, options={}):
685 version_manager = VersionManager()
686 ad_rspec_versions = []
687 request_rspec_versions = []
688 for rspec_version in version_manager.versions:
689 if rspec_version.content_type in ['*', 'ad']:
690 ad_rspec_versions.append(rspec_version.to_dict())
691 if rspec_version.content_type in ['*', 'request']:
692 request_rspec_versions.append(rspec_version.to_dict())
694 version_more = {'interface':'aggregate',
696 'geni_api': api.config.SFA_AGGREGATE_API_VERSION,
699 'geni_request_rspec_versions': request_rspec_versions,
700 'geni_ad_rspec_versions': ad_rspec_versions,
702 return version_core(version_more)