import sys
import os
+import logging
+import datetime
import boto
from boto.ec2.regioninfo import RegionInfo
from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn
from sfa.util.callids import Callids
+from threading import Thread
+from time import sleep
+
##
# The data structure used to represent a cloud.
# It contains the cloud name, its ip address, image information,
sys.stderr = file('/var/log/euca_agg.log', 'a+')
api = SfaAPI()
+##
+# Meta data of an instance.
+#
+class Meta(SQLObject):
+ instance = SingleJoin('EucaInstance')
+ state = StringCol(default = 'new')
+ pub_addr = StringCol(default = None)
+ pri_addr = StringCol(default = None)
+ start_time = DateTimeCol(default = None)
+
##
# A representation of an Eucalyptus instance. This is a support class
# for instance <-> slice mapping.
ramdisk_id = StringCol()
inst_type = StringCol()
key_pair = StringCol()
- slice = ForeignKey('Slice')
+ slice = ForeignKey('Slice')
+ meta = ForeignKey('Meta')
##
# Contacts Eucalyptus and tries to reserve this instance.
# Initialize the aggregate manager by reading a configuration file.
#
def init_server():
+ logger = logging.getLogger('EucaAggregate')
+ fileHandler = logging.FileHandler('/var/log/euca.log')
+ fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
+ logger.addHandler(fileHandler)
+ logger.setLevel(logging.DEBUG)
+
configParser = ConfigParser()
configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
if len(configParser.sections()) < 1:
detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
cloud['imageBundles'][name] = detail
- # Initialize sqlite3 database.
+ # Initialize sqlite3 database and tables.
dbPath = '/etc/sfa/db'
dbName = 'euca_aggregate.db'
sqlhub.processConnection = conn
Slice.createTable(ifNotExists=True)
EucaInstance.createTable(ifNotExists=True)
+ IP.createTable(ifNotExists=True)
# Make sure the schema exists.
if not os.path.exists(EUCALYPTUS_RSPEC_SCHEMA):
for inst in pendingRmInst:
print >>sys.stderr, 'Instance %s will be terminated' % inst
dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
- dbInst.destroySelf()
+ # Only change the state but do not remove the entry from the DB.
+ dbInst.meta.state = 'deleted'
+ #dbInst.destroySelf()
conn.terminate_instances(pendingRmInst)
# Process new instance requests
# Create the instances
for i in range(0, numInst):
- eucaInst = EucaInstance(slice = s,
- kernel_id = instKernel,
- image_id = instDiskImg,
+ eucaInst = EucaInstance(slice = s,
+ kernel_id = instKernel,
+ image_id = instDiskImg,
ramdisk_id = instRamDisk,
- key_pair = instKey,
- inst_type = instType)
+ key_pair = instKey,
+ inst_type = instType,
+ meta = Meta(start_time=datetime.datetime.now()))
eucaInst.reserveInstance(conn, pubKeys)
# xxx - should return altered rspec
# with enough data for the client to understand what's happened
return xml
+##
+# A thread that will update the meta data.
+#
+def updateMeta():
+ logger = logging.getLogger('EucaAggregate')
+ while True:
+ sleep(120)
+
+ # Get IDs of the instances that don't have IPs yet.
+ dbResults = Meta.select(
+ AND(Meta.q.pri_addr == None,
+ Meta.q.state != 'deleted')
+ )
+ dbResults = list(dbResults)
+ logger.debug('[update thread] dbResults: %s' % dbResults)
+ instids = []
+ for r in dbResults:
+ instids.append(r.instance.instance_id)
+ logger.debug('[update thread] Instance Id: %s' % ', '.join(instids))
+
+ # Get instance information from Eucalyptus
+ conn = getEucaConnection()
+ vmInstances = []
+ reservations = conn.get_all_instances(instids)
+ for reservation in reservations:
+ vmInstances += reservation.instances
+
+ # Check the IPs
+ instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
+ for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
+ logger.debug('[update thread] IP dict: %s' % str(instIPs))
+
+ # Update the local DB
+ for ipData in instIPs:
+ dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
+ if not dbInst:
+ logger.info('[update thread] Could not find %s in DB' % ipData['id'])
+ continue
+ dbInst.meta.pri_addr = ipData['pri_addr']
+ dbInst.meta.pub_addr = ipData['pub_addr']
+ dbInst.meta.state = 'running'
+
def main():
init_server()