X-Git-Url: http://git.onelab.eu/?p=sfa.git;a=blobdiff_plain;f=sfa%2Fmanagers%2Faggregate_manager_eucalyptus.py;h=2356f333d7fc6f368b2cdcdf04734e990d690e60;hp=e9969f9a968ae19f607463421a41f56261c20787;hb=06768bd605e5d47fadfc90a35c74e30f267226a5;hpb=b9e582564804f4538056758e86dee84c8fc5e6f5 diff --git a/sfa/managers/aggregate_manager_eucalyptus.py b/sfa/managers/aggregate_manager_eucalyptus.py index e9969f9a..2356f333 100644 --- a/sfa/managers/aggregate_manager_eucalyptus.py +++ b/sfa/managers/aggregate_manager_eucalyptus.py @@ -1,9 +1,11 @@ from __future__ import with_statement import sys -import os +import os, errno import logging import datetime +from multiprocessing import Process +from time import sleep import boto from boto.ec2.regioninfo import RegionInfo @@ -13,31 +15,21 @@ from xmlbuilder import XMLBuilder from lxml import etree as ET from sqlobject import * -from sfa.util.faults import * -from sfa.util.xrn import urn_to_hrn -from sfa.util.rspec import RSpec -from sfa.server.registry import Registries -from sfa.trust.credential import Credential -from sfa.plc.api import SfaAPI -from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn +from sfa.util.faults import InvalidRSpec +from sfa.util.xrn import urn_to_hrn, Xrn from sfa.util.callids import Callids +#comes with its own logging +#from sfa.util.sfalogging import logger +from sfa.util.version import version_core -from threading import Thread -from time import sleep - -## -# The data structure used to represent a cloud. -# It contains the cloud name, its ip address, image information, -# key pairs, and clusters information. -# -cloud = {} +from sfa.trust.credential import Credential -## -# The location of the RelaxNG schema. -# -EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng' +from sfa.rspecs.version_manager import VersionManager +from sfa.rspecs.rspec import RSpec -api = SfaAPI() +from sfa.planetlab.plaggregate import PlAggregate +from sfa.planetlab.plslices import PlSlices +from sfa.planetlab.plxrn import slicename_to_hrn ## # Meta data of an instance. @@ -76,9 +68,6 @@ class EucaInstance(SQLObject): (self.image_id, self.kernel_id, self.ramdisk_id, self.inst_type, self.key_pair)) - # XXX The return statement is for testing. REMOVE in production - #return - try: reservation = botoConn.run_instances(self.image_id, kernel_id = self.kernel_id, @@ -90,7 +79,7 @@ class EucaInstance(SQLObject): self.instance_id = instance.id # If there is an error, destroy itself. - except EC2ResponseError, ec2RespErr: + except EC2ResponseError as ec2RespErr: errTree = ET.fromstring(ec2RespErr.body) msg = errTree.find('.//Message') logger.error(msg.text) @@ -105,148 +94,6 @@ class Slice(SQLObject): #slice_index = DatabaseIndex('slice_hrn') instances = MultipleJoin('EucaInstance') -## -# Initialize the aggregate manager by reading a configuration file. -# -def init_server(): - logger = logging.getLogger('EucaAggregate') - fileHandler = logging.FileHandler('/var/log/euca.log') - fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) - logger.addHandler(fileHandler) - logger.setLevel(logging.DEBUG) - - configParser = ConfigParser() - configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf']) - if len(configParser.sections()) < 1: - logger.error('No cloud defined in the config file') - raise Exception('Cannot find cloud definition in configuration file.') - - # Only read the first section. - cloudSec = configParser.sections()[0] - cloud['name'] = cloudSec - cloud['access_key'] = configParser.get(cloudSec, 'access_key') - cloud['secret_key'] = configParser.get(cloudSec, 'secret_key') - cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url') - cloudURL = cloud['cloud_url'] - if cloudURL.find('https://') >= 0: - cloudURL = cloudURL.replace('https://', '') - elif cloudURL.find('http://') >= 0: - cloudURL = cloudURL.replace('http://', '') - (cloud['ip'], parts) = cloudURL.split(':') - - # Create image bundles - images = getEucaConnection().get_all_images() - cloud['images'] = images - cloud['imageBundles'] = {} - for i in images: - if i.type != 'machine' or i.kernel_id is None: continue - name = os.path.dirname(i.location) - detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id} - cloud['imageBundles'][name] = detail - - # Initialize sqlite3 database and tables. - dbPath = '/etc/sfa/db' - dbName = 'euca_aggregate.db' - - if not os.path.isdir(dbPath): - logger.info('%s not found. Creating directory ...' % dbPath) - os.mkdir(dbPath) - - conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName)) - sqlhub.processConnection = conn - Slice.createTable(ifNotExists=True) - EucaInstance.createTable(ifNotExists=True) - IP.createTable(ifNotExists=True) - - # Start the update thread to keep track of the meta data - # about Eucalyptus instance. - Thread(target=updateMeta).start() - - # Make sure the schema exists. - if not os.path.exists(EUCALYPTUS_RSPEC_SCHEMA): - err = 'Cannot location schema at %s' % EUCALYPTUS_RSPEC_SCHEMA - logger.error(err) - raise Exception(err) - -## -# Creates a connection to Eucalytpus. This function is inspired by -# the make_connection() in Euca2ools. -# -# @return A connection object or None -# -def getEucaConnection(): - global cloud - accessKey = cloud['access_key'] - secretKey = cloud['secret_key'] - eucaURL = cloud['cloud_url'] - useSSL = False - srvPath = '/' - eucaPort = 8773 - logger = logging.getLogger('EucaAggregate') - - if not accessKey or not secretKey or not eucaURL: - logger.error('Please set ALL of the required environment ' \ - 'variables by sourcing the eucarc file.') - return None - - # Split the url into parts - if eucaURL.find('https://') >= 0: - useSSL = True - eucaURL = eucaURL.replace('https://', '') - elif eucaURL.find('http://') >= 0: - useSSL = False - eucaURL = eucaURL.replace('http://', '') - (eucaHost, parts) = eucaURL.split(':') - if len(parts) > 1: - parts = parts.split('/') - eucaPort = int(parts[0]) - parts = parts[1:] - srvPath = '/'.join(parts) - - return boto.connect_ec2(aws_access_key_id=accessKey, - aws_secret_access_key=secretKey, - is_secure=useSSL, - region=RegionInfo(None, 'eucalyptus', eucaHost), - port=eucaPort, - path=srvPath) - -## -# Returns a string of keys that belong to the users of the given slice. -# @param sliceHRN The hunman readable name of the slice. -# @return sting() -# -def getKeysForSlice(sliceHRN): - logger = logging.getLogger('EucaAggregate') - try: - # convert hrn to slice name - plSliceName = hrn_to_pl_slicename(sliceHRN) - except IndexError, e: - logger.error('Invalid slice name (%s)' % sliceHRN) - return [] - - # Get the slice's information - sliceData = api.plshell.GetSlices(api.plauth, {'name':plSliceName}) - if not sliceData: - logger.warn('Cannot get any data for slice %s' % plSliceName) - return [] - - # It should only return a list with len = 1 - sliceData = sliceData[0] - - keys = [] - person_ids = sliceData['person_ids'] - if not person_ids: - logger.warn('No users in slice %s' % sliceHRN) - return [] - - persons = api.plshell.GetPersons(api.plauth, person_ids) - for person in persons: - pkeys = api.plshell.GetKeys(api.plauth, person['key_ids']) - for key in pkeys: - keys.append(key['key']) - - return ''.join(keys) - ## # A class that builds the RSpec for Eucalyptus. # @@ -374,7 +221,7 @@ class EucaRSpecBuilder(object): xml = self.eucaRSpec cloud = self.cloudInfo with xml.RSpec(type='eucalyptus'): - with xml.network(id=cloud['name']): + with xml.network(name=cloud['name']): with xml.ipv4: xml << cloud['ip'] #self.__keyPairsXML(cloud['keypairs']) @@ -423,237 +270,432 @@ class ZoneResultParser(object): return clusterList -def ListResources(api, creds, options, call_id): - if Callids().already_handled(call_id): return "" - global cloud - # get slice's hrn from options - xrn = options.get('geni_slice_urn', '') - hrn, type = urn_to_hrn(xrn) - logger = logging.getLogger('EucaAggregate') - - # get hrn of the original caller - origin_hrn = options.get('origin_hrn', None) - if not origin_hrn: - origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn() - - conn = getEucaConnection() - - if not conn: - logger.error('Cannot create a connection to Eucalyptus') - return 'Cannot create a connection to Eucalyptus' - - try: - # Zones - zones = conn.get_all_zones(['verbose']) - p = ZoneResultParser(zones) - clusters = p.parse() - cloud['clusters'] = clusters - - # Images - images = conn.get_all_images() - cloud['images'] = images - cloud['imageBundles'] = {} +class AggregateManagerEucalyptus: + + # The data structure used to represent a cloud. + # It contains the cloud name, its ip address, image information, + # key pairs, and clusters information. + cloud = {} + + # The location of the RelaxNG schema. + EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng' + + _inited=False + + # the init_server mechanism has vanished + def __init__ (self, config): + if AggregateManagerEucalyptus._inited: return + AggregateManagerEucalyptus.init_server() + + # Initialize the aggregate manager by reading a configuration file. + @staticmethod + def init_server(): + logger = logging.getLogger('EucaAggregate') + fileHandler = logging.FileHandler('/var/log/euca.log') + fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + logger.addHandler(fileHandler) + fileHandler.setLevel(logging.DEBUG) + logger.setLevel(logging.DEBUG) + + configParser = ConfigParser() + configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf']) + if len(configParser.sections()) < 1: + logger.error('No cloud defined in the config file') + raise Exception('Cannot find cloud definition in configuration file.') + + # Only read the first section. + cloudSec = configParser.sections()[0] + AggregateManagerEucalyptus.cloud['name'] = cloudSec + AggregateManagerEucalyptus.cloud['access_key'] = configParser.get(cloudSec, 'access_key') + AggregateManagerEucalyptus.cloud['secret_key'] = configParser.get(cloudSec, 'secret_key') + AggregateManagerEucalyptus.cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url') + cloudURL = AggregateManagerEucalyptus.cloud['cloud_url'] + if cloudURL.find('https://') >= 0: + cloudURL = cloudURL.replace('https://', '') + elif cloudURL.find('http://') >= 0: + cloudURL = cloudURL.replace('http://', '') + (AggregateManagerEucalyptus.cloud['ip'], parts) = cloudURL.split(':') + + # Create image bundles + images = self.getEucaConnection().get_all_images() + AggregateManagerEucalyptus.cloud['images'] = images + AggregateManagerEucalyptus.cloud['imageBundles'] = {} for i in images: if i.type != 'machine' or i.kernel_id is None: continue name = os.path.dirname(i.location) detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id} - cloud['imageBundles'][name] = detail - - # Key Pairs - keyPairs = conn.get_all_key_pairs() - cloud['keypairs'] = keyPairs - - if hrn: - instanceId = [] - instances = [] - - # Get the instances that belong to the given slice from sqlite3 - # XXX use getOne() in production because the slice's hrn is supposed - # to be unique. For testing, uniqueness is turned off in the db. - # If the slice isn't found in the database, create a record for the - # slice. - matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn)) - if matchedSlices: - theSlice = matchedSlices[-1] - else: - theSlice = Slice(slice_hrn = hrn) - for instance in theSlice.instances: - instanceId.append(instance.instance_id) - - # Get the information about those instances using their ids. - if len(instanceId) > 0: - reservations = conn.get_all_instances(instanceId) - else: - reservations = [] + AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail + + # Initialize sqlite3 database and tables. + dbPath = '/etc/sfa/db' + dbName = 'euca_aggregate.db' + + if not os.path.isdir(dbPath): + logger.info('%s not found. Creating directory ...' % dbPath) + os.mkdir(dbPath) + + conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName)) + sqlhub.processConnection = conn + Slice.createTable(ifNotExists=True) + EucaInstance.createTable(ifNotExists=True) + Meta.createTable(ifNotExists=True) + + # Start the update process to keep track of the meta data + # about Eucalyptus instance. + Process(target=AggregateManagerEucalyptus.updateMeta).start() + + # Make sure the schema exists. + if not os.path.exists(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA): + err = 'Cannot location schema at %s' % AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA + logger.error(err) + raise Exception(err) + + # + # A separate process that will update the meta data. + # + @staticmethod + def updateMeta(): + logger = logging.getLogger('EucaMeta') + fileHandler = logging.FileHandler('/var/log/euca_meta.log') + fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + logger.addHandler(fileHandler) + fileHandler.setLevel(logging.DEBUG) + logger.setLevel(logging.DEBUG) + + while True: + sleep(30) + + # Get IDs of the instances that don't have IPs yet. + dbResults = Meta.select( + AND(Meta.q.pri_addr == None, + Meta.q.state != 'deleted') + ) + dbResults = list(dbResults) + logger.debug('[update process] dbResults: %s' % dbResults) + instids = [] + for r in dbResults: + if not r.instance: + continue + instids.append(r.instance.instance_id) + logger.debug('[update process] Instance Id: %s' % ', '.join(instids)) + + # Get instance information from Eucalyptus + conn = self.getEucaConnection() + vmInstances = [] + reservations = conn.get_all_instances(instids) for reservation in reservations: - for instance in reservation.instances: - instances.append(instance) - - # Construct a dictionary for the EucaRSpecBuilder - instancesDict = {} - for instance in instances: - instList = instancesDict.setdefault(instance.instance_type, []) - instInfoDict = {} - - instInfoDict['id'] = instance.id - instInfoDict['public_dns'] = instance.public_dns_name - instInfoDict['state'] = instance.state - instInfoDict['key'] = instance.key_name - - instList.append(instInfoDict) - cloud['instances'] = instancesDict - - except EC2ResponseError, ec2RespErr: - errTree = ET.fromstring(ec2RespErr.body) - errMsgE = errTree.find('.//Message') - logger.error(errMsgE.text) - - rspec = EucaRSpecBuilder(cloud).toXML() - - # Remove the instances records so next time they won't - # show up. - if 'instances' in cloud: - del cloud['instances'] - - return rspec - -""" -Hook called via 'sfi.py create' -""" -def CreateSliver(api, xrn, creds, xml, users, call_id): - if Callids().already_handled(call_id): return "" - - global cloud - hrn = urn_to_hrn(xrn)[0] - logger = logging.getLogger('EucaAggregate') - - conn = getEucaConnection() - if not conn: - logger.error('Cannot create a connection to Eucalyptus') - return "" - - # Validate RSpec - schemaXML = ET.parse(EUCALYPTUS_RSPEC_SCHEMA) - rspecValidator = ET.RelaxNG(schemaXML) - rspecXML = ET.XML(xml) - if not rspecValidator(rspecXML): - error = rspecValidator.error_log.last_error - message = '%s (line %s)' % (error.message, error.line) - # XXX: InvalidRSpec is new. Currently, I am not working with Trunk code. - #raise InvalidRSpec(message) - raise Exception(message) - - # Get the slice from db or create one. - s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None) - if s is None: - s = Slice(slice_hrn = hrn) - - # Process any changes in existing instance allocation - pendingRmInst = [] - for sliceInst in s.instances: - pendingRmInst.append(sliceInst.instance_id) - existingInstGroup = rspecXML.findall('.//euca_instances') - for instGroup in existingInstGroup: - for existingInst in instGroup: - if existingInst.get('id') in pendingRmInst: - pendingRmInst.remove(existingInst.get('id')) - for inst in pendingRmInst: - logger.debug('Instance %s will be terminated' % inst) - dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None) - # Only change the state but do not remove the entry from the DB. - dbInst.meta.state = 'deleted' - #dbInst.destroySelf() - conn.terminate_instances(pendingRmInst) - - # Process new instance requests - requests = rspecXML.findall('.//request') - if requests: - # Get all the public keys associate with slice. - pubKeys = getKeysForSlice(s.slice_hrn) - logger.debug('Passing the following keys to the instance:\n%s' % pubKeys) - for req in requests: - vmTypeElement = req.getparent() - instType = vmTypeElement.get('name') - numInst = int(req.find('instances').text) + vmInstances += reservation.instances + + # Check the IPs + instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name} + for i in vmInstances if i.private_dns_name != '0.0.0.0' ] + logger.debug('[update process] IP dict: %s' % str(instIPs)) + + # Update the local DB + for ipData in instIPs: + dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None) + if not dbInst: + logger.info('[update process] Could not find %s in DB' % ipData['id']) + continue + dbInst.meta.pri_addr = ipData['pri_addr'] + dbInst.meta.pub_addr = ipData['pub_addr'] + dbInst.meta.state = 'running' + + self.dumpinstanceInfo() + + ## + # Creates a connection to Eucalytpus. This function is inspired by + # the make_connection() in Euca2ools. + # + # @return A connection object or None + # + def getEucaConnection(): + accessKey = AggregateManagerEucalyptus.cloud['access_key'] + secretKey = AggregateManagerEucalyptus.cloud['secret_key'] + eucaURL = AggregateManagerEucalyptus.cloud['cloud_url'] + useSSL = False + srvPath = '/' + eucaPort = 8773 + logger = logging.getLogger('EucaAggregate') + + if not accessKey or not secretKey or not eucaURL: + logger.error('Please set ALL of the required environment ' \ + 'variables by sourcing the eucarc file.') + return None - bundleName = req.find('bundle').text - if not cloud['imageBundles'][bundleName]: - logger.error('Cannot find bundle %s' % bundleName) - bundleInfo = cloud['imageBundles'][bundleName] - instKernel = bundleInfo['kernelID'] - instDiskImg = bundleInfo['imageID'] - instRamDisk = bundleInfo['ramdiskID'] - instKey = None - - # Create the instances - for i in range(0, numInst): - eucaInst = EucaInstance(slice = s, - kernel_id = instKernel, - image_id = instDiskImg, - ramdisk_id = instRamDisk, - key_pair = instKey, - inst_type = instType, - meta = Meta(start_time=datetime.datetime.now())) - eucaInst.reserveInstance(conn, pubKeys) - - # xxx - should return altered rspec - # with enough data for the client to understand what's happened - return xml - -## -# A thread that will update the meta data. -# -def updateMeta(): - logger = logging.getLogger('EucaAggregate') - while True: - sleep(120) - - # Get IDs of the instances that don't have IPs yet. + # Split the url into parts + if eucaURL.find('https://') >= 0: + useSSL = True + eucaURL = eucaURL.replace('https://', '') + elif eucaURL.find('http://') >= 0: + useSSL = False + eucaURL = eucaURL.replace('http://', '') + (eucaHost, parts) = eucaURL.split(':') + if len(parts) > 1: + parts = parts.split('/') + eucaPort = int(parts[0]) + parts = parts[1:] + srvPath = '/'.join(parts) + + return boto.connect_ec2(aws_access_key_id=accessKey, + aws_secret_access_key=secretKey, + is_secure=useSSL, + region=RegionInfo(None, 'eucalyptus', eucaHost), + port=eucaPort, + path=srvPath) + + def ListResources(api, creds, options): + call_id = options.get('call_id') + if Callids().already_handled(call_id): return "" + # get slice's hrn from options + xrn = options.get('geni_slice_urn', '') + hrn, type = urn_to_hrn(xrn) + logger = logging.getLogger('EucaAggregate') + + # get hrn of the original caller + origin_hrn = options.get('origin_hrn', None) + if not origin_hrn: + origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn() + + conn = self.getEucaConnection() + + if not conn: + logger.error('Cannot create a connection to Eucalyptus') + return 'Cannot create a connection to Eucalyptus' + + try: + # Zones + zones = conn.get_all_zones(['verbose']) + p = ZoneResultParser(zones) + clusters = p.parse() + AggregateManagerEucalyptus.cloud['clusters'] = clusters + + # Images + images = conn.get_all_images() + AggregateManagerEucalyptus.cloud['images'] = images + AggregateManagerEucalyptus.cloud['imageBundles'] = {} + for i in images: + if i.type != 'machine' or i.kernel_id is None: continue + name = os.path.dirname(i.location) + detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id} + AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail + + # Key Pairs + keyPairs = conn.get_all_key_pairs() + AggregateManagerEucalyptus.cloud['keypairs'] = keyPairs + + if hrn: + instanceId = [] + instances = [] + + # Get the instances that belong to the given slice from sqlite3 + # XXX use getOne() in production because the slice's hrn is supposed + # to be unique. For testing, uniqueness is turned off in the db. + # If the slice isn't found in the database, create a record for the + # slice. + matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn)) + if matchedSlices: + theSlice = matchedSlices[-1] + else: + theSlice = Slice(slice_hrn = hrn) + for instance in theSlice.instances: + instanceId.append(instance.instance_id) + + # Get the information about those instances using their ids. + if len(instanceId) > 0: + reservations = conn.get_all_instances(instanceId) + else: + reservations = [] + for reservation in reservations: + for instance in reservation.instances: + instances.append(instance) + + # Construct a dictionary for the EucaRSpecBuilder + instancesDict = {} + for instance in instances: + instList = instancesDict.setdefault(instance.instance_type, []) + instInfoDict = {} + + instInfoDict['id'] = instance.id + instInfoDict['public_dns'] = instance.public_dns_name + instInfoDict['state'] = instance.state + instInfoDict['key'] = instance.key_name + + instList.append(instInfoDict) + AggregateManagerEucalyptus.cloud['instances'] = instancesDict + + except EC2ResponseError as ec2RespErr: + errTree = ET.fromstring(ec2RespErr.body) + errMsgE = errTree.find('.//Message') + logger.error(errMsgE.text) + + rspec = EucaRSpecBuilder(AggregateManagerEucalyptus.cloud).toXML() + + # Remove the instances records so next time they won't + # show up. + if 'instances' in AggregateManagerEucalyptus.cloud: + del AggregateManagerEucalyptus.cloud['instances'] + + return rspec + + """ + Hook called via 'sfi.py create' + """ + def CreateSliver(api, slice_xrn, creds, xml, users, options): + call_id = options.get('call_id') + if Callids().already_handled(call_id): return "" + + logger = logging.getLogger('EucaAggregate') + logger.debug("In CreateSliver") + + aggregate = PlAggregate(self.driver) + slices = PlSlices(self.driver) + (hrn, type) = urn_to_hrn(slice_xrn) + peer = slices.get_peer(hrn) + sfa_peer = slices.get_sfa_peer(hrn) + slice_record=None + if users: + slice_record = users[0].get('slice_record', {}) + + conn = self.getEucaConnection() + if not conn: + logger.error('Cannot create a connection to Eucalyptus') + return "" + + # Validate RSpec + schemaXML = ET.parse(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA) + rspecValidator = ET.RelaxNG(schemaXML) + rspecXML = ET.XML(xml) + for network in rspecXML.iterfind("./network"): + if network.get('name') != AggregateManagerEucalyptus.cloud['name']: + # Throw away everything except my own RSpec + # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id')) + network.getparent().remove(network) + if not rspecValidator(rspecXML): + error = rspecValidator.error_log.last_error + message = '%s (line %s)' % (error.message, error.line) + raise InvalidRSpec(message) + + """ + Create the sliver[s] (slice) at this aggregate. + Verify HRN and initialize the slice record in PLC if necessary. + """ + + # ensure site record exists + site = slices.verify_site(hrn, slice_record, peer, sfa_peer) + # ensure slice record exists + slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer) + # ensure person records exists + persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer) + + # Get the slice from db or create one. + s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None) + if s is None: + s = Slice(slice_hrn = hrn) + + # Process any changes in existing instance allocation + pendingRmInst = [] + for sliceInst in s.instances: + pendingRmInst.append(sliceInst.instance_id) + existingInstGroup = rspecXML.findall(".//euca_instances") + for instGroup in existingInstGroup: + for existingInst in instGroup: + if existingInst.get('id') in pendingRmInst: + pendingRmInst.remove(existingInst.get('id')) + for inst in pendingRmInst: + dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None) + if dbInst.meta.state != 'deleted': + logger.debug('Instance %s will be terminated' % inst) + # Terminate instances one at a time for robustness + conn.terminate_instances([inst]) + # Only change the state but do not remove the entry from the DB. + dbInst.meta.state = 'deleted' + #dbInst.destroySelf() + + # Process new instance requests + requests = rspecXML.findall(".//request") + if requests: + # Get all the public keys associate with slice. + keys = [] + for user in users: + keys += user['keys'] + logger.debug("Keys: %s" % user['keys']) + pubKeys = '\n'.join(keys) + logger.debug('Passing the following keys to the instance:\n%s' % pubKeys) + for req in requests: + vmTypeElement = req.getparent() + instType = vmTypeElement.get('name') + numInst = int(req.find('instances').text) + + bundleName = req.find('bundle').text + if not AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]: + logger.error('Cannot find bundle %s' % bundleName) + bundleInfo = AggregateManagerEucalyptus.cloud['imageBundles'][bundleName] + instKernel = bundleInfo['kernelID'] + instDiskImg = bundleInfo['imageID'] + instRamDisk = bundleInfo['ramdiskID'] + instKey = None + + # Create the instances + for i in range(0, numInst): + eucaInst = EucaInstance(slice = s, + kernel_id = instKernel, + image_id = instDiskImg, + ramdisk_id = instRamDisk, + key_pair = instKey, + inst_type = instType, + meta = Meta(start_time=datetime.datetime.utcnow())) + eucaInst.reserveInstance(conn, pubKeys) + + # xxx - should return altered rspec + # with enough data for the client to understand what's happened + return xml + + ## + # Return information on the IP addresses bound to each slice's instances + # + def dumpInstanceInfo(): + logger = logging.getLogger('EucaMeta') + outdir = "/var/www/html/euca/" + outfile = outdir + "instances.txt" + + try: + os.makedirs(outdir) + except OSError as e: + if e.errno != errno.EEXIST: + raise + dbResults = Meta.select( - AND(Meta.q.pri_addr == None, - Meta.q.state != 'deleted') - ) + AND(Meta.q.pri_addr != None, + Meta.q.state == 'running') + ) dbResults = list(dbResults) - logger.debug('[update thread] dbResults: %s' % dbResults) - instids = [] + f = open(outfile, "w") for r in dbResults: - instids.append(r.instance.instance_id) - logger.debug('[update thread] Instance Id: %s' % ', '.join(instids)) - - # Get instance information from Eucalyptus - conn = getEucaConnection() - vmInstances = [] - reservations = conn.get_all_instances(instids) - for reservation in reservations: - vmInstances += reservation.instances - - # Check the IPs - instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name} - for i in vmInstances if i.private_dns_name != '0.0.0.0' ] - logger.debug('[update thread] IP dict: %s' % str(instIPs)) - - # Update the local DB - for ipData in instIPs: - dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None) - if not dbInst: - logger.info('[update thread] Could not find %s in DB' % ipData['id']) - continue - dbInst.meta.pri_addr = ipData['pri_addr'] - dbInst.meta.pub_addr = ipData['pub_addr'] - dbInst.meta.state = 'running' - -def main(): - init_server() - - #theRSpec = None - #with open(sys.argv[1]) as xml: - # theRSpec = xml.read() - #CreateSliver(None, 'planetcloud.pc.test', theRSpec, 'call-id-cloudtest') - - #rspec = ListResources('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy', 'test_euca') - #print rspec - print getKeysForSlice('gc.gc.test1') - -if __name__ == "__main__": - main() - + instId = r.instance.instance_id + ipaddr = r.pri_addr + hrn = r.instance.slice.slice_hrn + logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn)) + f.write("%s %s %s\n" % (instId, ipaddr, hrn)) + f.close() + + def GetVersion(api, options): + + version_manager = VersionManager() + ad_rspec_versions = [] + request_rspec_versions = [] + for rspec_version in version_manager.versions: + if rspec_version.content_type in ['*', 'ad']: + ad_rspec_versions.append(rspec_version.to_dict()) + if rspec_version.content_type in ['*', 'request']: + request_rspec_versions.append(rspec_version.to_dict()) + xrn=Xrn(api.hrn) + version_more = {'interface':'aggregate', + 'sfa': 1, + 'geni_api': '2', + 'testbed':'myplc', + 'hrn':xrn.get_hrn(), + 'geni_request_rspec_versions': request_rspec_versions, + 'geni_ad_rspec_versions': ad_rspec_versions, + } + return version_core(version_more)