From: Marco Yuen Date: Tue, 21 Jun 2011 17:15:11 +0000 (-0400) Subject: GENICLOUD-25 X-Git-Tag: sfa-1.0-28~12 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=7bec62a300d9e177387a95eb9ae1fa9027e6eb8e;p=sfa.git GENICLOUD-25 Update database to keep track of (IP, time) -> instance mappings. * Create a new table to keep track of meta data of an instance (e.g. state, public ip, private ip, start time). * Implement a thread to poll Eucalyptus for information about instances. * Testing is needed. --- diff --git a/sfa/managers/aggregate_manager_eucalyptus.py b/sfa/managers/aggregate_manager_eucalyptus.py index 68669bd9..ecb0ba3e 100644 --- a/sfa/managers/aggregate_manager_eucalyptus.py +++ b/sfa/managers/aggregate_manager_eucalyptus.py @@ -2,6 +2,8 @@ from __future__ import with_statement import sys import os +import logging +import datetime import boto from boto.ec2.regioninfo import RegionInfo @@ -20,6 +22,9 @@ from sfa.plc.api import SfaAPI from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn from sfa.util.callids import Callids +from threading import Thread +from time import sleep + ## # The data structure used to represent a cloud. # It contains the cloud name, its ip address, image information, @@ -36,6 +41,16 @@ EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng' sys.stderr = file('/var/log/euca_agg.log', 'a+') api = SfaAPI() +## +# Meta data of an instance. +# +class Meta(SQLObject): + instance = SingleJoin('EucaInstance') + state = StringCol(default = 'new') + pub_addr = StringCol(default = None) + pri_addr = StringCol(default = None) + start_time = DateTimeCol(default = None) + ## # A representation of an Eucalyptus instance. This is a support class # for instance <-> slice mapping. @@ -47,7 +62,8 @@ class EucaInstance(SQLObject): ramdisk_id = StringCol() inst_type = StringCol() key_pair = StringCol() - slice = ForeignKey('Slice') + slice = ForeignKey('Slice') + meta = ForeignKey('Meta') ## # Contacts Eucalyptus and tries to reserve this instance. @@ -94,6 +110,12 @@ class Slice(SQLObject): # Initialize the aggregate manager by reading a configuration file. # def init_server(): + logger = logging.getLogger('EucaAggregate') + fileHandler = logging.FileHandler('/var/log/euca.log') + fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + logger.addHandler(fileHandler) + logger.setLevel(logging.DEBUG) + configParser = ConfigParser() configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf']) if len(configParser.sections()) < 1: @@ -123,7 +145,7 @@ def init_server(): detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id} cloud['imageBundles'][name] = detail - # Initialize sqlite3 database. + # Initialize sqlite3 database and tables. dbPath = '/etc/sfa/db' dbName = 'euca_aggregate.db' @@ -135,6 +157,7 @@ def init_server(): sqlhub.processConnection = conn Slice.createTable(ifNotExists=True) EucaInstance.createTable(ifNotExists=True) + IP.createTable(ifNotExists=True) # Make sure the schema exists. if not os.path.exists(EUCALYPTUS_RSPEC_SCHEMA): @@ -529,7 +552,9 @@ def CreateSliver(api, xrn, creds, xml, users, call_id): for inst in pendingRmInst: print >>sys.stderr, 'Instance %s will be terminated' % inst dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None) - dbInst.destroySelf() + # Only change the state but do not remove the entry from the DB. + dbInst.meta.state = 'deleted' + #dbInst.destroySelf() conn.terminate_instances(pendingRmInst) # Process new instance requests @@ -555,18 +580,61 @@ def CreateSliver(api, xrn, creds, xml, users, call_id): # Create the instances for i in range(0, numInst): - eucaInst = EucaInstance(slice = s, - kernel_id = instKernel, - image_id = instDiskImg, + eucaInst = EucaInstance(slice = s, + kernel_id = instKernel, + image_id = instDiskImg, ramdisk_id = instRamDisk, - key_pair = instKey, - inst_type = instType) + key_pair = instKey, + inst_type = instType, + meta = Meta(start_time=datetime.datetime.now())) eucaInst.reserveInstance(conn, pubKeys) # xxx - should return altered rspec # with enough data for the client to understand what's happened return xml +## +# A thread that will update the meta data. +# +def updateMeta(): + logger = logging.getLogger('EucaAggregate') + while True: + sleep(120) + + # Get IDs of the instances that don't have IPs yet. + dbResults = Meta.select( + AND(Meta.q.pri_addr == None, + Meta.q.state != 'deleted') + ) + dbResults = list(dbResults) + logger.debug('[update thread] dbResults: %s' % dbResults) + instids = [] + for r in dbResults: + instids.append(r.instance.instance_id) + logger.debug('[update thread] Instance Id: %s' % ', '.join(instids)) + + # Get instance information from Eucalyptus + conn = getEucaConnection() + vmInstances = [] + reservations = conn.get_all_instances(instids) + for reservation in reservations: + vmInstances += reservation.instances + + # Check the IPs + instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name} + for i in vmInstances if i.private_dns_name != '0.0.0.0' ] + logger.debug('[update thread] IP dict: %s' % str(instIPs)) + + # Update the local DB + for ipData in instIPs: + dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None) + if not dbInst: + logger.info('[update thread] Could not find %s in DB' % ipData['id']) + continue + dbInst.meta.pri_addr = ipData['pri_addr'] + dbInst.meta.pub_addr = ipData['pub_addr'] + dbInst.meta.state = 'running' + def main(): init_server()