X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Fmanagers%2Faggregate_manager_eucalyptus.py;h=5e968547dd5a275348c0b2ccec46a2a7e7b770a8;hb=fc4b3be087bdf9daa44cf701d22f6798869f5577;hp=e62381a12470b493aa8db2a36ad27fcd4e0b8ed1;hpb=52b4cb21f33f380f55d467709b1b9bc8ba84bd57;p=sfa.git diff --git a/sfa/managers/aggregate_manager_eucalyptus.py b/sfa/managers/aggregate_manager_eucalyptus.py index e62381a1..5e968547 100644 --- a/sfa/managers/aggregate_manager_eucalyptus.py +++ b/sfa/managers/aggregate_manager_eucalyptus.py @@ -1,9 +1,9 @@ from __future__ import with_statement -from sfa.util.faults import * -from sfa.util.namespace import * -from sfa.util.rspec import RSpec -from sfa.server.registry import Registries -from sfa.plc.nodes import * + +import sys +import os +import logging +import datetime import boto from boto.ec2.regioninfo import RegionInfo @@ -13,8 +13,20 @@ from xmlbuilder import XMLBuilder from lxml import etree as ET from sqlobject import * -import sys -import os +from sfa.util.faults import * +from sfa.util.xrn import urn_to_hrn, Xrn +from sfa.util.rspec import RSpec +from sfa.server.registry import Registries +from sfa.trust.credential import Credential +from sfa.plc.api import SfaAPI +from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn +from sfa.util.callids import Callids +from sfa.util.sfalogging import sfa_logger +from sfa.rspecs.sfa_rspec import sfa_rspec_version +from sfa.util.version import version_core + +from threading import Thread +from time import sleep ## # The data structure used to represent a cloud. @@ -28,6 +40,18 @@ cloud = {} # EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng' +api = SfaAPI() + +## +# Meta data of an instance. +# +class Meta(SQLObject): + instance = SingleJoin('EucaInstance') + state = StringCol(default = 'new') + pub_addr = StringCol(default = None) + pri_addr = StringCol(default = None) + start_time = DateTimeCol(default = None) + ## # A representation of an Eucalyptus instance. This is a support class # for instance <-> slice mapping. @@ -39,18 +63,21 @@ class EucaInstance(SQLObject): ramdisk_id = StringCol() inst_type = StringCol() key_pair = StringCol() - slice = ForeignKey('Slice') + slice = ForeignKey('Slice') + meta = ForeignKey('Meta') ## # Contacts Eucalyptus and tries to reserve this instance. # # @param botoConn A connection to Eucalyptus. + # @param pubKeys A list of public keys for the instance. # - def reserveInstance(self, botoConn): - print >>sys.stderr, 'Reserving an instance: image: %s, kernel: ' \ - '%s, ramdisk: %s, type: %s, key: %s' % \ - (self.image_id, self.kernel_id, self.ramdisk_id, - self.inst_type, self.key_pair) + def reserveInstance(self, botoConn, pubKeys): + logger = logging.getLogger('EucaAggregate') + logger.info('Reserving an instance: image: %s, kernel: ' \ + '%s, ramdisk: %s, type: %s, key: %s' % \ + (self.image_id, self.kernel_id, self.ramdisk_id, + self.inst_type, self.key_pair)) # XXX The return statement is for testing. REMOVE in production #return @@ -60,7 +87,8 @@ class EucaInstance(SQLObject): kernel_id = self.kernel_id, ramdisk_id = self.ramdisk_id, instance_type = self.inst_type, - key_name = self.key_pair) + key_name = self.key_pair, + user_data = pubKeys) for instance in reservation.instances: self.instance_id = instance.id @@ -68,7 +96,7 @@ class EucaInstance(SQLObject): except EC2ResponseError, ec2RespErr: errTree = ET.fromstring(ec2RespErr.body) msg = errTree.find('.//Message') - print >>sys.stderr, msg.text + logger.error(msg.text) self.destroySelf() ## @@ -84,10 +112,16 @@ class Slice(SQLObject): # Initialize the aggregate manager by reading a configuration file. # def init_server(): + logger = logging.getLogger('EucaAggregate') + fileHandler = logging.FileHandler('/var/log/euca.log') + fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + logger.addHandler(fileHandler) + logger.setLevel(logging.DEBUG) + configParser = ConfigParser() configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf']) if len(configParser.sections()) < 1: - print >>sys.stderr, 'No cloud defined in the config file' + logger.error('No cloud defined in the config file') raise Exception('Cannot find cloud definition in configuration file.') # Only read the first section. @@ -103,23 +137,38 @@ def init_server(): cloudURL = cloudURL.replace('http://', '') (cloud['ip'], parts) = cloudURL.split(':') - # Initialize sqlite3 database. + # Create image bundles + images = getEucaConnection().get_all_images() + cloud['images'] = images + cloud['imageBundles'] = {} + for i in images: + if i.type != 'machine' or i.kernel_id is None: continue + name = os.path.dirname(i.location) + detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id} + cloud['imageBundles'][name] = detail + + # Initialize sqlite3 database and tables. dbPath = '/etc/sfa/db' dbName = 'euca_aggregate.db' if not os.path.isdir(dbPath): - print >>sys.stderr, '%s not found. Creating directory ...' % dbPath + logger.info('%s not found. Creating directory ...' % dbPath) os.mkdir(dbPath) conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName)) sqlhub.processConnection = conn Slice.createTable(ifNotExists=True) EucaInstance.createTable(ifNotExists=True) + IP.createTable(ifNotExists=True) + + # Start the update thread to keep track of the meta data + # about Eucalyptus instance. + Thread(target=updateMeta).start() # Make sure the schema exists. if not os.path.exists(EUCALYPTUS_RSPEC_SCHEMA): err = 'Cannot location schema at %s' % EUCALYPTUS_RSPEC_SCHEMA - print >>sys.stderr, err + logger.error(err) raise Exception(err) ## @@ -136,10 +185,11 @@ def getEucaConnection(): useSSL = False srvPath = '/' eucaPort = 8773 + logger = logging.getLogger('EucaAggregate') if not accessKey or not secretKey or not eucaURL: - print >>sys.stderr, 'Please set ALL of the required environment ' \ - 'variables by sourcing the eucarc file.' + logger.error('Please set ALL of the required environment ' \ + 'variables by sourcing the eucarc file.') return None # Split the url into parts @@ -163,6 +213,34 @@ def getEucaConnection(): port=eucaPort, path=srvPath) +## +# Returns a string of keys that belong to the users of the given slice. +# @param sliceHRN The hunman readable name of the slice. +# @return sting() +# +def getKeysForSlice(api, sliceHRN): + logger = logging.getLogger('EucaAggregate') + cred = api.getCredential() + registry = api.registries[api.hrn] + keys = [] + + # Get the slice record + records = registry.Resolve(sliceHRN, cred) + if not records: + logging.warn('Cannot find any record for slice %s' % sliceHRN) + return [] + + # Find who can log into this slice + persons = records[0]['persons'] + + # Extract the keys from persons records + for p in persons: + sliceUser = registry.Resolve(p, cred) + userKeys = sliceUser[0]['keys'] + keys += userKeys + + return ''.join(keys) + ## # A class that builds the RSpec for Eucalyptus. # @@ -230,8 +308,6 @@ class EucaRSpecBuilder(object): xml << str(inst[4]) with xml.disk_space(unit='GB'): xml << str(inst[5]) - if inst[0] == 'm1.small': - self.__requestXML(1, 'emi-88760F45', 'eki-F26610C6', 'cortex') if 'instances' in cloud and inst[0] in cloud['instances']: existingEucaInstances = cloud['instances'][inst[0]] with xml.euca_instances: @@ -241,8 +317,13 @@ class EucaRSpecBuilder(object): xml << eucaInst['state'] with xml.public_dns: xml << eucaInst['public_dns'] - with xml.keypair: - xml << eucaInst['key'] + + def __imageBundleXML(self, bundles): + xml = self.eucaRSpec + with xml.bundles: + for bundle in bundles.keys(): + with xml.bundle(id=bundle): + xml << '' ## # Creates the Images stanza. @@ -279,18 +360,20 @@ class EucaRSpecBuilder(object): # Generates the RSpec. # def toXML(self): + logger = logging.getLogger('EucaAggregate') if not self.cloudInfo: - print >>sys.stderr, 'No cloud information' + logger.error('No cloud information') return '' xml = self.eucaRSpec cloud = self.cloudInfo with xml.RSpec(type='eucalyptus'): - with xml.cloud(id=cloud['name']): + with xml.network(id=cloud['name']): with xml.ipv4: xml << cloud['ip'] - self.__keyPairsXML(cloud['keypairs']) - self.__imagesXML(cloud['images']) + #self.__keyPairsXML(cloud['keypairs']) + #self.__imagesXML(cloud['images']) + self.__imageBundleXML(cloud['imageBundles']) self.__clustersXML(cloud['clusters']) return str(xml) @@ -334,13 +417,24 @@ class ZoneResultParser(object): return clusterList -def get_rspec(api, xrn, origin_hrn): +def ListResources(api, creds, options, call_id): + if Callids().already_handled(call_id): return "" global cloud - hrn = urn_to_hrn(xrn)[0] + # get slice's hrn from options + xrn = options.get('geni_slice_urn', '') + hrn, type = urn_to_hrn(xrn) + logger = logging.getLogger('EucaAggregate') + + # get hrn of the original caller + origin_hrn = options.get('origin_hrn', None) + if not origin_hrn: + origin_hrn = Credential(string=creds).get_gid_caller().get_hrn() + # origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn() + conn = getEucaConnection() if not conn: - print >>sys.stderr, 'Error: Cannot create a connection to Eucalyptus' + logger.error('Cannot create a connection to Eucalyptus') return 'Cannot create a connection to Eucalyptus' try: @@ -353,6 +447,12 @@ def get_rspec(api, xrn, origin_hrn): # Images images = conn.get_all_images() cloud['images'] = images + cloud['imageBundles'] = {} + for i in images: + if i.type != 'machine' or i.kernel_id is None: continue + name = os.path.dirname(i.location) + detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id} + cloud['imageBundles'][name] = detail # Key Pairs keyPairs = conn.get_all_key_pairs() @@ -365,7 +465,13 @@ def get_rspec(api, xrn, origin_hrn): # Get the instances that belong to the given slice from sqlite3 # XXX use getOne() in production because the slice's hrn is supposed # to be unique. For testing, uniqueness is turned off in the db. - theSlice = list(Slice.select(Slice.q.slice_hrn == hrn))[-1] + # If the slice isn't found in the database, create a record for the + # slice. + matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn)) + if matchedSlices: + theSlice = matchedSlices[-1] + else: + theSlice = Slice(slice_hrn = hrn) for instance in theSlice.instances: instanceId.append(instance.instance_id) @@ -378,7 +484,7 @@ def get_rspec(api, xrn, origin_hrn): for instance in reservation.instances: instances.append(instance) - # Construct a dictory for the EucaRSpecBuilder + # Construct a dictionary for the EucaRSpecBuilder instancesDict = {} for instance in instances: instList = instancesDict.setdefault(instance.instance_type, []) @@ -395,7 +501,7 @@ def get_rspec(api, xrn, origin_hrn): except EC2ResponseError, ec2RespErr: errTree = ET.fromstring(ec2RespErr.body) errMsgE = errTree.find('.//Message') - print >>sys.stderr, errMsgE.text + logger.error(errMsgE.text) rspec = EucaRSpecBuilder(cloud).toXML() @@ -409,19 +515,27 @@ def get_rspec(api, xrn, origin_hrn): """ Hook called via 'sfi.py create' """ -def create_slice(api, xrn, xml): +def CreateSliver(api, xrn, creds, xml, users, call_id): + if Callids().already_handled(call_id): return "" + global cloud hrn = urn_to_hrn(xrn)[0] + logger = logging.getLogger('EucaAggregate') conn = getEucaConnection() if not conn: - print >>sys.stderr, 'Error: Cannot create a connection to Eucalyptus' - return False + logger.error('Cannot create a connection to Eucalyptus') + return "" # Validate RSpec schemaXML = ET.parse(EUCALYPTUS_RSPEC_SCHEMA) rspecValidator = ET.RelaxNG(schemaXML) rspecXML = ET.XML(xml) + for network in rspecXML.iterfind("./network"): + if network.get('id') != cloud['name']: + # Throw away everything except my own RSpec + # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id')) + network.getparent().remove(network) if not rspecValidator(rspecXML): error = rspecValidator.error_log.last_error message = '%s (line %s)' % (error.message, error.line) @@ -438,57 +552,125 @@ def create_slice(api, xrn, xml): pendingRmInst = [] for sliceInst in s.instances: pendingRmInst.append(sliceInst.instance_id) - existingInstGroup = rspecXML.findall('.//euca_instances') + existingInstGroup = rspecXML.findall(".//euca_instances") for instGroup in existingInstGroup: for existingInst in instGroup: if existingInst.get('id') in pendingRmInst: pendingRmInst.remove(existingInst.get('id')) for inst in pendingRmInst: - print >>sys.stderr, 'Instance %s will be terminated' % inst + logger.debug('Instance %s will be terminated' % inst) dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None) - dbInst.destroySelf() + # Only change the state but do not remove the entry from the DB. + dbInst.meta.state = 'deleted' + #dbInst.destroySelf() conn.terminate_instances(pendingRmInst) # Process new instance requests - requests = rspecXML.findall('.//request') + requests = rspecXML.findall(".//request") + if requests: + # Get all the public keys associate with slice. + pubKeys = getKeysForSlice(api, s.slice_hrn) + logger.debug('Passing the following keys to the instance:\n%s' % pubKeys) for req in requests: vmTypeElement = req.getparent() instType = vmTypeElement.get('name') numInst = int(req.find('instances').text) - instKernel = req.find('kernel_image').get('id') - instDiskImg = req.find('disk_image').get('id') - instKey = req.find('keypair').text - ramDiskElement = req.find('ramdisk') - ramDiskAttr = ramDiskElement.attrib - if 'id' in ramDiskAttr: - instRamDisk = ramDiskAttr['id'] - else: - instRamDisk = None + bundleName = req.find('bundle').text + if not cloud['imageBundles'][bundleName]: + logger.error('Cannot find bundle %s' % bundleName) + bundleInfo = cloud['imageBundles'][bundleName] + instKernel = bundleInfo['kernelID'] + instDiskImg = bundleInfo['imageID'] + instRamDisk = bundleInfo['ramdiskID'] + instKey = None # Create the instances for i in range(0, numInst): - eucaInst = EucaInstance(slice = s, - kernel_id = instKernel, - image_id = instDiskImg, + eucaInst = EucaInstance(slice = s, + kernel_id = instKernel, + image_id = instDiskImg, ramdisk_id = instRamDisk, - key_pair = instKey, - inst_type = instType) - eucaInst.reserveInstance(conn) + key_pair = instKey, + inst_type = instType, + meta = Meta(start_time=datetime.datetime.now())) + eucaInst.reserveInstance(conn, pubKeys) - return True + # xxx - should return altered rspec + # with enough data for the client to understand what's happened + return xml + +## +# A thread that will update the meta data. +# +def updateMeta(): + logger = logging.getLogger('EucaAggregate') + while True: + sleep(120) + + # Get IDs of the instances that don't have IPs yet. + dbResults = Meta.select( + AND(Meta.q.pri_addr == None, + Meta.q.state != 'deleted') + ) + dbResults = list(dbResults) + logger.debug('[update thread] dbResults: %s' % dbResults) + instids = [] + for r in dbResults: + instids.append(r.instance.instance_id) + logger.debug('[update thread] Instance Id: %s' % ', '.join(instids)) + + # Get instance information from Eucalyptus + conn = getEucaConnection() + vmInstances = [] + reservations = conn.get_all_instances(instids) + for reservation in reservations: + vmInstances += reservation.instances + + # Check the IPs + instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name} + for i in vmInstances if i.private_dns_name != '0.0.0.0' ] + logger.debug('[update thread] IP dict: %s' % str(instIPs)) + + # Update the local DB + for ipData in instIPs: + dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None) + if not dbInst: + logger.info('[update thread] Could not find %s in DB' % ipData['id']) + continue + dbInst.meta.pri_addr = ipData['pri_addr'] + dbInst.meta.pub_addr = ipData['pub_addr'] + dbInst.meta.state = 'running' + +def GetVersion(api): + xrn=Xrn(api.hrn) + request_rspec_versions = [dict(sfa_rspec_version)] + ad_rspec_versions = [dict(sfa_rspec_version)] + version_more = {'interface':'aggregate', + 'testbed':'myplc', + 'hrn':xrn.get_hrn(), + 'request_rspec_versions': request_rspec_versions, + 'ad_rspec_versions': ad_rspec_versions, + 'default_ad_rspec': dict(sfa_rspec_version) + } + return version_core(version_more) def main(): init_server() - theRSpec = None - with open(sys.argv[1]) as xml: - theRSpec = xml.read() - create_slice(None, 'planetcloud.pc.test', theRSpec) + #theRSpec = None + #with open(sys.argv[1]) as xml: + # theRSpec = xml.read() + #CreateSliver(None, 'planetcloud.pc.test', theRSpec, 'call-id-cloudtest') - #rspec = get_rspec('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy') + #rspec = ListResources('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy', 'test_euca') #print rspec + server_key_file = '/var/lib/sfa/authorities/server.key' + server_cert_file = '/var/lib/sfa/authorities/server.cert' + api = SfaAPI(key_file = server_key_file, cert_file = server_cert_file, interface='aggregate') + print getKeysForSlice(api, 'gc.gc.test1') + if __name__ == "__main__": main()