eucalyptus aggregate (last) manager roughly implemented as a class
authorThierry Parmentelat <thierry.parmentelat@sophia.inria.fr>
Tue, 8 Nov 2011 00:08:31 +0000 (01:08 +0100)
committerThierry Parmentelat <thierry.parmentelat@sophia.inria.fr>
Tue, 8 Nov 2011 00:08:31 +0000 (01:08 +0100)
sfa/managers/aggregate_manager_eucalyptus.py

index 3f04ce9..4d73ab1 100644 (file)
@@ -15,11 +15,12 @@ from xmlbuilder import XMLBuilder
 from lxml import etree as ET
 from sqlobject import *
 
-from sfa.util.faults import *
+from sfa.util.faults import InvalidRSpec, 
 from sfa.util.xrn import urn_to_hrn, Xrn
 from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn
 from sfa.util.callids import Callids
-from sfa.util.sfalogging import logger
+#comes with its own logging
+#from sfa.util.sfalogging import logger
 from sfa.util.version import version_core
 
 from sfa.trust.credential import Credential
@@ -27,21 +28,9 @@ from sfa.trust.credential import Credential
 from sfa.server.sfaapi import SfaApi
 
 from sfa.plc.aggregate import Aggregate
-from sfa.plc.slices import *
-from sfa.rspecs.sfa_rspec import sfa_rspec_version
-
-
-##
-# The data structure used to represent a cloud.
-# It contains the cloud name, its ip address, image information,
-# key pairs, and clusters information.
-#
-cloud = {}
-
-##
-# The location of the RelaxNG schema.
-#
-EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
+from sfa.plc.slices import Slice, Slices
+# not sure what this used to be nor where it is now defined
+#from sfa.rspecs.sfa_rspec import sfa_rspec_version
 
 ##
 # Meta data of an instance.
@@ -80,9 +69,6 @@ class EucaInstance(SQLObject):
                     (self.image_id, self.kernel_id, self.ramdisk_id,
                     self.inst_type, self.key_pair))
 
-        # XXX The return statement is for testing. REMOVE in production
-        #return
-
         try:
             reservation = botoConn.run_instances(self.image_id,
                                                  kernel_id = self.kernel_id,
@@ -109,143 +95,6 @@ class Slice(SQLObject):
     #slice_index = DatabaseIndex('slice_hrn')
     instances = MultipleJoin('EucaInstance')
 
-##
-# Initialize the aggregate manager by reading a configuration file.
-#
-def init_server():
-    logger = logging.getLogger('EucaAggregate')
-    fileHandler = logging.FileHandler('/var/log/euca.log')
-    fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
-    logger.addHandler(fileHandler)
-    fileHandler.setLevel(logging.DEBUG)
-    logger.setLevel(logging.DEBUG)
-
-    configParser = ConfigParser()
-    configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
-    if len(configParser.sections()) < 1:
-        logger.error('No cloud defined in the config file')
-        raise Exception('Cannot find cloud definition in configuration file.')
-
-    # Only read the first section.
-    cloudSec = configParser.sections()[0]
-    cloud['name'] = cloudSec
-    cloud['access_key'] = configParser.get(cloudSec, 'access_key')
-    cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
-    cloud['cloud_url']  = configParser.get(cloudSec, 'cloud_url')
-    cloudURL = cloud['cloud_url']
-    if cloudURL.find('https://') >= 0:
-        cloudURL = cloudURL.replace('https://', '')
-    elif cloudURL.find('http://') >= 0:
-        cloudURL = cloudURL.replace('http://', '')
-    (cloud['ip'], parts) = cloudURL.split(':')
-
-    # Create image bundles
-    images = getEucaConnection().get_all_images()
-    cloud['images'] = images
-    cloud['imageBundles'] = {}
-    for i in images:
-        if i.type != 'machine' or i.kernel_id is None: continue
-        name = os.path.dirname(i.location)
-        detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
-        cloud['imageBundles'][name] = detail
-
-    # Initialize sqlite3 database and tables.
-    dbPath = '/etc/sfa/db'
-    dbName = 'euca_aggregate.db'
-
-    if not os.path.isdir(dbPath):
-        logger.info('%s not found. Creating directory ...' % dbPath)
-        os.mkdir(dbPath)
-
-    conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
-    sqlhub.processConnection = conn
-    Slice.createTable(ifNotExists=True)
-    EucaInstance.createTable(ifNotExists=True)
-    Meta.createTable(ifNotExists=True)
-
-    # Start the update process to keep track of the meta data
-    # about Eucalyptus instance.
-    Process(target=updateMeta).start()
-
-    # Make sure the schema exists.
-    if not os.path.exists(EUCALYPTUS_RSPEC_SCHEMA):
-        err = 'Cannot location schema at %s' % EUCALYPTUS_RSPEC_SCHEMA
-        logger.error(err)
-        raise Exception(err)
-
-##
-# Creates a connection to Eucalytpus. This function is inspired by 
-# the make_connection() in Euca2ools.
-#
-# @return A connection object or None
-#
-def getEucaConnection():
-    global cloud
-    accessKey = cloud['access_key']
-    secretKey = cloud['secret_key']
-    eucaURL   = cloud['cloud_url']
-    useSSL    = False
-    srvPath   = '/'
-    eucaPort  = 8773
-    logger    = logging.getLogger('EucaAggregate')
-
-    if not accessKey or not secretKey or not eucaURL:
-        logger.error('Please set ALL of the required environment ' \
-                     'variables by sourcing the eucarc file.')
-        return None
-    
-    # Split the url into parts
-    if eucaURL.find('https://') >= 0:
-        useSSL  = True
-        eucaURL = eucaURL.replace('https://', '')
-    elif eucaURL.find('http://') >= 0:
-        useSSL  = False
-        eucaURL = eucaURL.replace('http://', '')
-    (eucaHost, parts) = eucaURL.split(':')
-    if len(parts) > 1:
-        parts = parts.split('/')
-        eucaPort = int(parts[0])
-        parts = parts[1:]
-        srvPath = '/'.join(parts)
-
-    return boto.connect_ec2(aws_access_key_id=accessKey,
-                            aws_secret_access_key=secretKey,
-                            is_secure=useSSL,
-                            region=RegionInfo(None, 'eucalyptus', eucaHost), 
-                            port=eucaPort,
-                            path=srvPath)
-
-##
-# Returns a string of keys that belong to the users of the given slice.
-# @param sliceHRN The hunman readable name of the slice.
-# @return sting()
-#
-# This method is no longer needed because the user keys are passed into
-# CreateSliver
-#
-def getKeysForSlice(api, sliceHRN):
-    logger   = logging.getLogger('EucaAggregate')
-    cred     = api.getCredential()
-    registry = api.registries[api.hrn]
-    keys     = []
-
-    # Get the slice record
-    records = registry.Resolve(sliceHRN, cred)
-    if not records:
-        logging.warn('Cannot find any record for slice %s' % sliceHRN)
-        return []
-
-    # Find who can log into this slice
-    persons = records[0]['persons']
-
-    # Extract the keys from persons records
-    for p in persons:
-        sliceUser = registry.Resolve(p, cred)
-        userKeys = sliceUser[0]['keys']
-        keys += userKeys
-
-    return '\n'.join(keys)
-
 ##
 # A class that builds the RSpec for Eucalyptus.
 #
@@ -422,322 +271,423 @@ class ZoneResultParser(object):
 
         return clusterList
 
-def ListResources(api, creds, options, call_id): 
-    if Callids().already_handled(call_id): return ""
-    global cloud
-    # get slice's hrn from options
-    xrn = options.get('geni_slice_urn', '')
-    hrn, type = urn_to_hrn(xrn)
-    logger = logging.getLogger('EucaAggregate')
-
-    # get hrn of the original caller
-    origin_hrn = options.get('origin_hrn', None)
-    if not origin_hrn:
-        origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
-
-    conn = getEucaConnection()
-
-    if not conn:
-        logger.error('Cannot create a connection to Eucalyptus')
-        return 'Cannot create a connection to Eucalyptus'
-
-    try:
-        # Zones
-        zones = conn.get_all_zones(['verbose'])
-        p = ZoneResultParser(zones)
-        clusters = p.parse()
-        cloud['clusters'] = clusters
-        
-        # Images
-        images = conn.get_all_images()
-        cloud['images'] = images
-        cloud['imageBundles'] = {}
+class AggregateManagerEucalyptus:
+
+    # The data structure used to represent a cloud.
+    # It contains the cloud name, its ip address, image information,
+    # key pairs, and clusters information.
+    cloud = {}
+    
+    # The location of the RelaxNG schema.
+    EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
+    
+    _inited=False
+
+    # the init_server mechanism has vanished
+    def __init__ (self):
+        if AggregateManagerEucalyptus._inited: return
+        AggregateManagerEucalyptus.init_server()
+
+    # Initialize the aggregate manager by reading a configuration file.
+    @staticmethod
+    def init_server():
+        logger = logging.getLogger('EucaAggregate')
+        fileHandler = logging.FileHandler('/var/log/euca.log')
+        fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
+        logger.addHandler(fileHandler)
+        fileHandler.setLevel(logging.DEBUG)
+        logger.setLevel(logging.DEBUG)
+
+        configParser = ConfigParser()
+        configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
+        if len(configParser.sections()) < 1:
+            logger.error('No cloud defined in the config file')
+            raise Exception('Cannot find cloud definition in configuration file.')
+    
+        # Only read the first section.
+        cloudSec = configParser.sections()[0]
+        AggregateManagerEucalyptus.cloud['name'] = cloudSec
+        AggregateManagerEucalyptus.cloud['access_key'] = configParser.get(cloudSec, 'access_key')
+        AggregateManagerEucalyptus.cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
+        AggregateManagerEucalyptus.cloud['cloud_url']  = configParser.get(cloudSec, 'cloud_url')
+        cloudURL = AggregateManagerEucalyptus.cloud['cloud_url']
+        if cloudURL.find('https://') >= 0:
+            cloudURL = cloudURL.replace('https://', '')
+        elif cloudURL.find('http://') >= 0:
+            cloudURL = cloudURL.replace('http://', '')
+        (AggregateManagerEucalyptus.cloud['ip'], parts) = cloudURL.split(':')
+    
+        # Create image bundles
+        images = self.getEucaConnection().get_all_images()
+        AggregateManagerEucalyptus.cloud['images'] = images
+        AggregateManagerEucalyptus.cloud['imageBundles'] = {}
         for i in images:
             if i.type != 'machine' or i.kernel_id is None: continue
             name = os.path.dirname(i.location)
             detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
-            cloud['imageBundles'][name] = detail
-
-        # Key Pairs
-        keyPairs = conn.get_all_key_pairs()
-        cloud['keypairs'] = keyPairs
-
-        if hrn:
-            instanceId = []
-            instances  = []
-
-            # Get the instances that belong to the given slice from sqlite3
-            # XXX use getOne() in production because the slice's hrn is supposed
-            # to be unique. For testing, uniqueness is turned off in the db.
-            # If the slice isn't found in the database, create a record for the 
-            # slice.
-            matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
-            if matchedSlices:
-                theSlice = matchedSlices[-1]
-            else:
-                theSlice = Slice(slice_hrn = hrn)
-            for instance in theSlice.instances:
-                instanceId.append(instance.instance_id)
-
-            # Get the information about those instances using their ids.
-            if len(instanceId) > 0:
-                reservations = conn.get_all_instances(instanceId)
-            else:
-                reservations = []
+            AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
+    
+        # Initialize sqlite3 database and tables.
+        dbPath = '/etc/sfa/db'
+        dbName = 'euca_aggregate.db'
+    
+        if not os.path.isdir(dbPath):
+            logger.info('%s not found. Creating directory ...' % dbPath)
+            os.mkdir(dbPath)
+    
+        conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
+        sqlhub.processConnection = conn
+        Slice.createTable(ifNotExists=True)
+        EucaInstance.createTable(ifNotExists=True)
+        Meta.createTable(ifNotExists=True)
+    
+        # Start the update process to keep track of the meta data
+        # about Eucalyptus instance.
+        Process(target=AggregateManagerEucalyptus.updateMeta).start()
+    
+        # Make sure the schema exists.
+        if not os.path.exists(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA):
+            err = 'Cannot location schema at %s' % AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA
+            logger.error(err)
+            raise Exception(err)
+    
+    #
+    # A separate process that will update the meta data.
+    #
+    @staticmethod    
+    def updateMeta():
+        logger = logging.getLogger('EucaMeta')
+        fileHandler = logging.FileHandler('/var/log/euca_meta.log')
+        fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
+        logger.addHandler(fileHandler)
+        fileHandler.setLevel(logging.DEBUG)
+        logger.setLevel(logging.DEBUG)
+    
+        while True:
+            sleep(30)
+    
+            # Get IDs of the instances that don't have IPs yet.
+            dbResults = Meta.select(
+                          AND(Meta.q.pri_addr == None,
+                              Meta.q.state    != 'deleted')
+                        )
+            dbResults = list(dbResults)
+            logger.debug('[update process] dbResults: %s' % dbResults)
+            instids = []
+            for r in dbResults:
+                if not r.instance:
+                    continue
+                instids.append(r.instance.instance_id)
+            logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
+    
+            # Get instance information from Eucalyptus
+            conn = self.getEucaConnection()
+            vmInstances = []
+            reservations = conn.get_all_instances(instids)
             for reservation in reservations:
-                for instance in reservation.instances:
-                    instances.append(instance)
-
-            # Construct a dictionary for the EucaRSpecBuilder
-            instancesDict = {}
-            for instance in instances:
-                instList = instancesDict.setdefault(instance.instance_type, [])
-                instInfoDict = {} 
-
-                instInfoDict['id'] = instance.id
-                instInfoDict['public_dns'] = instance.public_dns_name
-                instInfoDict['state'] = instance.state
-                instInfoDict['key'] = instance.key_name
-
-                instList.append(instInfoDict)
-            cloud['instances'] = instancesDict
-
-    except EC2ResponseError, ec2RespErr:
-        errTree = ET.fromstring(ec2RespErr.body)
-        errMsgE = errTree.find('.//Message')
-        logger.error(errMsgE.text)
-
-    rspec = EucaRSpecBuilder(cloud).toXML()
-
-    # Remove the instances records so next time they won't 
-    # show up.
-    if 'instances' in cloud:
-        del cloud['instances']
-
-    return rspec
-
-"""
-Hook called via 'sfi.py create'
-"""
-def CreateSliver(api, slice_xrn, creds, xml, users, call_id):
-    if Callids().already_handled(call_id): return ""
-
-    global cloud
-    logger = logging.getLogger('EucaAggregate')
-    logger.debug("In CreateSliver")
-
-    aggregate = Aggregate(api)
-    slices = Slices(api)
-    (hrn, type) = urn_to_hrn(slice_xrn)
-    peer = slices.get_peer(hrn)
-    sfa_peer = slices.get_sfa_peer(hrn)
-    slice_record=None
-    if users:
-        slice_record = users[0].get('slice_record', {})
-
-    conn = getEucaConnection()
-    if not conn:
-        logger.error('Cannot create a connection to Eucalyptus')
-        return ""
-
-    # Validate RSpec
-    schemaXML = ET.parse(EUCALYPTUS_RSPEC_SCHEMA)
-    rspecValidator = ET.RelaxNG(schemaXML)
-    rspecXML = ET.XML(xml)
-    for network in rspecXML.iterfind("./network"):
-        if network.get('name') != cloud['name']:
-            # Throw away everything except my own RSpec
-            # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
-            network.getparent().remove(network)
-    if not rspecValidator(rspecXML):
-        error = rspecValidator.error_log.last_error
-        message = '%s (line %s)' % (error.message, error.line) 
-        raise InvalidRSpec(message)
-
+                vmInstances += reservation.instances
+    
+            # Check the IPs
+            instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
+                        for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
+            logger.debug('[update process] IP dict: %s' % str(instIPs))
+    
+            # Update the local DB
+            for ipData in instIPs:
+                dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
+                if not dbInst:
+                    logger.info('[update process] Could not find %s in DB' % ipData['id'])
+                    continue
+                dbInst.meta.pri_addr = ipData['pri_addr']
+                dbInst.meta.pub_addr = ipData['pub_addr']
+                dbInst.meta.state    = 'running'
+    
+            self.dumpinstanceInfo()
+    
+    ##
+    # Creates a connection to Eucalytpus. This function is inspired by 
+    # the make_connection() in Euca2ools.
+    #
+    # @return A connection object or None
+    #
+    def getEucaConnection():
+        accessKey = AggregateManagerEucalyptus.cloud['access_key']
+        secretKey = AggregateManagerEucalyptus.cloud['secret_key']
+        eucaURL   = AggregateManagerEucalyptus.cloud['cloud_url']
+        useSSL    = False
+        srvPath   = '/'
+        eucaPort  = 8773
+        logger    = logging.getLogger('EucaAggregate')
+    
+        if not accessKey or not secretKey or not eucaURL:
+            logger.error('Please set ALL of the required environment ' \
+                         'variables by sourcing the eucarc file.')
+            return None
+        
+        # Split the url into parts
+        if eucaURL.find('https://') >= 0:
+            useSSL  = True
+            eucaURL = eucaURL.replace('https://', '')
+        elif eucaURL.find('http://') >= 0:
+            useSSL  = False
+            eucaURL = eucaURL.replace('http://', '')
+        (eucaHost, parts) = eucaURL.split(':')
+        if len(parts) > 1:
+            parts = parts.split('/')
+            eucaPort = int(parts[0])
+            parts = parts[1:]
+            srvPath = '/'.join(parts)
+    
+        return boto.connect_ec2(aws_access_key_id=accessKey,
+                                aws_secret_access_key=secretKey,
+                                is_secure=useSSL,
+                                region=RegionInfo(None, 'eucalyptus', eucaHost), 
+                                port=eucaPort,
+                                path=srvPath)
+    
+    def ListResources(api, creds, options, call_id): 
+        if Callids().already_handled(call_id): return ""
+        # get slice's hrn from options
+        xrn = options.get('geni_slice_urn', '')
+        hrn, type = urn_to_hrn(xrn)
+        logger = logging.getLogger('EucaAggregate')
+    
+        # get hrn of the original caller
+        origin_hrn = options.get('origin_hrn', None)
+        if not origin_hrn:
+            origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
+    
+        conn = self.getEucaConnection()
+    
+        if not conn:
+            logger.error('Cannot create a connection to Eucalyptus')
+            return 'Cannot create a connection to Eucalyptus'
+    
+        try:
+            # Zones
+            zones = conn.get_all_zones(['verbose'])
+            p = ZoneResultParser(zones)
+            clusters = p.parse()
+            AggregateManagerEucalyptus.cloud['clusters'] = clusters
+            
+            # Images
+            images = conn.get_all_images()
+            AggregateManagerEucalyptus.cloud['images'] = images
+            AggregateManagerEucalyptus.cloud['imageBundles'] = {}
+            for i in images:
+                if i.type != 'machine' or i.kernel_id is None: continue
+                name = os.path.dirname(i.location)
+                detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
+                AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
+    
+            # Key Pairs
+            keyPairs = conn.get_all_key_pairs()
+            AggregateManagerEucalyptus.cloud['keypairs'] = keyPairs
+    
+            if hrn:
+                instanceId = []
+                instances  = []
+    
+                # Get the instances that belong to the given slice from sqlite3
+                # XXX use getOne() in production because the slice's hrn is supposed
+                # to be unique. For testing, uniqueness is turned off in the db.
+                # If the slice isn't found in the database, create a record for the 
+                # slice.
+                matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
+                if matchedSlices:
+                    theSlice = matchedSlices[-1]
+                else:
+                    theSlice = Slice(slice_hrn = hrn)
+                for instance in theSlice.instances:
+                    instanceId.append(instance.instance_id)
+    
+                # Get the information about those instances using their ids.
+                if len(instanceId) > 0:
+                    reservations = conn.get_all_instances(instanceId)
+                else:
+                    reservations = []
+                for reservation in reservations:
+                    for instance in reservation.instances:
+                        instances.append(instance)
+    
+                # Construct a dictionary for the EucaRSpecBuilder
+                instancesDict = {}
+                for instance in instances:
+                    instList = instancesDict.setdefault(instance.instance_type, [])
+                    instInfoDict = {} 
+    
+                    instInfoDict['id'] = instance.id
+                    instInfoDict['public_dns'] = instance.public_dns_name
+                    instInfoDict['state'] = instance.state
+                    instInfoDict['key'] = instance.key_name
+    
+                    instList.append(instInfoDict)
+                AggregateManagerEucalyptus.cloud['instances'] = instancesDict
+    
+        except EC2ResponseError, ec2RespErr:
+            errTree = ET.fromstring(ec2RespErr.body)
+            errMsgE = errTree.find('.//Message')
+            logger.error(errMsgE.text)
+    
+        rspec = EucaRSpecBuilder(AggregateManagerEucalyptus.cloud).toXML()
+    
+        # Remove the instances records so next time they won't 
+        # show up.
+        if 'instances' in AggregateManagerEucalyptus.cloud:
+            del AggregateManagerEucalyptus.cloud['instances']
+    
+        return rspec
+    
     """
-    Create the sliver[s] (slice) at this aggregate.
-    Verify HRN and initialize the slice record in PLC if necessary.
+    Hook called via 'sfi.py create'
     """
-
-    # ensure site record exists
-    site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
-    # ensure slice record exists
-    slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
-    # ensure person records exists
-    persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
-
-    # Get the slice from db or create one.
-    s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
-    if s is None:
-        s = Slice(slice_hrn = hrn)
-
-    # Process any changes in existing instance allocation
-    pendingRmInst = []
-    for sliceInst in s.instances:
-        pendingRmInst.append(sliceInst.instance_id)
-    existingInstGroup = rspecXML.findall(".//euca_instances")
-    for instGroup in existingInstGroup:
-        for existingInst in instGroup:
-            if existingInst.get('id') in pendingRmInst:
-                pendingRmInst.remove(existingInst.get('id'))
-    for inst in pendingRmInst:
-        dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
-        if dbInst.meta.state != 'deleted':
-            logger.debug('Instance %s will be terminated' % inst)
-            # Terminate instances one at a time for robustness
-            conn.terminate_instances([inst])
-            # Only change the state but do not remove the entry from the DB.
-            dbInst.meta.state = 'deleted'
-            #dbInst.destroySelf()
-
-    # Process new instance requests
-    requests = rspecXML.findall(".//request")
-    if requests:
-        # Get all the public keys associate with slice.
-        keys = []
-        for user in users:
-            keys += user['keys']
-            logger.debug("Keys: %s" % user['keys'])
-        pubKeys = '\n'.join(keys)
-        logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
-    for req in requests:
-        vmTypeElement = req.getparent()
-        instType = vmTypeElement.get('name')
-        numInst  = int(req.find('instances').text)
-        
-        bundleName = req.find('bundle').text
-        if not cloud['imageBundles'][bundleName]:
-            logger.error('Cannot find bundle %s' % bundleName)
-        bundleInfo = cloud['imageBundles'][bundleName]
-        instKernel  = bundleInfo['kernelID']
-        instDiskImg = bundleInfo['imageID']
-        instRamDisk = bundleInfo['ramdiskID']
-        instKey     = None
-
-        # Create the instances
-        for i in range(0, numInst):
-            eucaInst = EucaInstance(slice      = s,
-                                    kernel_id  = instKernel,
-                                    image_id   = instDiskImg,
-                                    ramdisk_id = instRamDisk,
-                                    key_pair   = instKey,
-                                    inst_type  = instType,
-                                    meta       = Meta(start_time=datetime.datetime.now()))
-            eucaInst.reserveInstance(conn, pubKeys)
-
-    # xxx - should return altered rspec 
-    # with enough data for the client to understand what's happened
-    return xml
-
-##
-# Return information on the IP addresses bound to each slice's instances
-#
-def dumpInstanceInfo():
-    logger = logging.getLogger('EucaMeta')
-    outdir = "/var/www/html/euca/"
-    outfile = outdir + "instances.txt"
-
-    try:
-        os.makedirs(outdir)
-    except OSError, e:
-        if e.errno != errno.EEXIST:
-            raise
-
-    dbResults = Meta.select(
-        AND(Meta.q.pri_addr != None,
-            Meta.q.state    == 'running')
-        )
-    dbResults = list(dbResults)
-    f = open(outfile, "w")
-    for r in dbResults:
-        instId = r.instance.instance_id
-        ipaddr = r.pri_addr
-        hrn = r.instance.slice.slice_hrn
-        logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
-        f.write("%s %s %s\n" % (instId, ipaddr, hrn))
-    f.close()
-
-##
-# A separate process that will update the meta data.
-#
-def updateMeta():
-    logger = logging.getLogger('EucaMeta')
-    fileHandler = logging.FileHandler('/var/log/euca_meta.log')
-    fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
-    logger.addHandler(fileHandler)
-    fileHandler.setLevel(logging.DEBUG)
-    logger.setLevel(logging.DEBUG)
-
-    while True:
-        sleep(30)
-
-        # Get IDs of the instances that don't have IPs yet.
+    def CreateSliver(api, slice_xrn, creds, xml, users, call_id):
+        if Callids().already_handled(call_id): return ""
+    
+        logger = logging.getLogger('EucaAggregate')
+        logger.debug("In CreateSliver")
+    
+        aggregate = Aggregate(api)
+        slices = Slices(api)
+        (hrn, type) = urn_to_hrn(slice_xrn)
+        peer = slices.get_peer(hrn)
+        sfa_peer = slices.get_sfa_peer(hrn)
+        slice_record=None
+        if users:
+            slice_record = users[0].get('slice_record', {})
+    
+        conn = self.getEucaConnection()
+        if not conn:
+            logger.error('Cannot create a connection to Eucalyptus')
+            return ""
+    
+        # Validate RSpec
+        schemaXML = ET.parse(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA)
+        rspecValidator = ET.RelaxNG(schemaXML)
+        rspecXML = ET.XML(xml)
+        for network in rspecXML.iterfind("./network"):
+            if network.get('name') != AggregateManagerEucalyptus.cloud['name']:
+                # Throw away everything except my own RSpec
+                # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
+                network.getparent().remove(network)
+        if not rspecValidator(rspecXML):
+            error = rspecValidator.error_log.last_error
+            message = '%s (line %s)' % (error.message, error.line) 
+            raise InvalidRSpec(message)
+    
+        """
+        Create the sliver[s] (slice) at this aggregate.
+        Verify HRN and initialize the slice record in PLC if necessary.
+        """
+    
+        # ensure site record exists
+        site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
+        # ensure slice record exists
+        slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
+        # ensure person records exists
+        persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
+    
+        # Get the slice from db or create one.
+        s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
+        if s is None:
+            s = Slice(slice_hrn = hrn)
+    
+        # Process any changes in existing instance allocation
+        pendingRmInst = []
+        for sliceInst in s.instances:
+            pendingRmInst.append(sliceInst.instance_id)
+        existingInstGroup = rspecXML.findall(".//euca_instances")
+        for instGroup in existingInstGroup:
+            for existingInst in instGroup:
+                if existingInst.get('id') in pendingRmInst:
+                    pendingRmInst.remove(existingInst.get('id'))
+        for inst in pendingRmInst:
+            dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
+            if dbInst.meta.state != 'deleted':
+                logger.debug('Instance %s will be terminated' % inst)
+                # Terminate instances one at a time for robustness
+                conn.terminate_instances([inst])
+                # Only change the state but do not remove the entry from the DB.
+                dbInst.meta.state = 'deleted'
+                #dbInst.destroySelf()
+    
+        # Process new instance requests
+        requests = rspecXML.findall(".//request")
+        if requests:
+            # Get all the public keys associate with slice.
+            keys = []
+            for user in users:
+                keys += user['keys']
+                logger.debug("Keys: %s" % user['keys'])
+            pubKeys = '\n'.join(keys)
+            logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
+        for req in requests:
+            vmTypeElement = req.getparent()
+            instType = vmTypeElement.get('name')
+            numInst  = int(req.find('instances').text)
+            
+            bundleName = req.find('bundle').text
+            if not AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]:
+                logger.error('Cannot find bundle %s' % bundleName)
+            bundleInfo = AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]
+            instKernel  = bundleInfo['kernelID']
+            instDiskImg = bundleInfo['imageID']
+            instRamDisk = bundleInfo['ramdiskID']
+            instKey     = None
+    
+            # Create the instances
+            for i in range(0, numInst):
+                eucaInst = EucaInstance(slice      = s,
+                                        kernel_id  = instKernel,
+                                        image_id   = instDiskImg,
+                                        ramdisk_id = instRamDisk,
+                                        key_pair   = instKey,
+                                        inst_type  = instType,
+                                        meta       = Meta(start_time=datetime.datetime.now()))
+                eucaInst.reserveInstance(conn, pubKeys)
+    
+        # xxx - should return altered rspec 
+        # with enough data for the client to understand what's happened
+        return xml
+    
+    ##
+    # Return information on the IP addresses bound to each slice's instances
+    #
+    def dumpInstanceInfo():
+        logger = logging.getLogger('EucaMeta')
+        outdir = "/var/www/html/euca/"
+        outfile = outdir + "instances.txt"
+    
+        try:
+            os.makedirs(outdir)
+        except OSError, e:
+            if e.errno != errno.EEXIST:
+                raise
+    
         dbResults = Meta.select(
-                      AND(Meta.q.pri_addr == None,
-                          Meta.q.state    != 'deleted')
-                    )
+            AND(Meta.q.pri_addr != None,
+                Meta.q.state    == 'running')
+            )
         dbResults = list(dbResults)
-        logger.debug('[update process] dbResults: %s' % dbResults)
-        instids = []
+        f = open(outfile, "w")
         for r in dbResults:
-            if not r.instance:
-                continue
-            instids.append(r.instance.instance_id)
-        logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
-
-        # Get instance information from Eucalyptus
-        conn = getEucaConnection()
-        vmInstances = []
-        reservations = conn.get_all_instances(instids)
-        for reservation in reservations:
-            vmInstances += reservation.instances
-
-        # Check the IPs
-        instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
-                    for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
-        logger.debug('[update process] IP dict: %s' % str(instIPs))
-
-        # Update the local DB
-        for ipData in instIPs:
-            dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
-            if not dbInst:
-                logger.info('[update process] Could not find %s in DB' % ipData['id'])
-                continue
-            dbInst.meta.pri_addr = ipData['pri_addr']
-            dbInst.meta.pub_addr = ipData['pub_addr']
-            dbInst.meta.state    = 'running'
-
-        dumpInstanceInfo()
-
-def GetVersion(api):
-    xrn=Xrn(api.hrn)
-    request_rspec_versions = [dict(sfa_rspec_version)]
-    ad_rspec_versions = [dict(sfa_rspec_version)]
-    version_more = {'interface':'aggregate',
-                    'testbed':'myplc',
-                    'hrn':xrn.get_hrn(),
-                    'request_rspec_versions': request_rspec_versions,
-                    'ad_rspec_versions': ad_rspec_versions,
-                    'default_ad_rspec': dict(sfa_rspec_version)
-                    }
-    return version_core(version_more)
-
-#def main():
-#    init_server()
-#
-#    #theRSpec = None
-#    #with open(sys.argv[1]) as xml:
-#    #    theRSpec = xml.read()
-#    #CreateSliver(None, 'planetcloud.pc.test', theRSpec, 'call-id-cloudtest')
-#
-#    #rspec = ListResources('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy', 'test_euca')
-#    #print rspec
-#
-#    server_key_file = '/var/lib/sfa/authorities/server.key'
-#    server_cert_file = '/var/lib/sfa/authorities/server.cert'
-#    api = PlcSfaApi(key_file = server_key_file, cert_file = server_cert_file, interface='aggregate')
-#    print getKeysForSlice(api, 'gc.gc.test1')
-#
-#if __name__ == "__main__":
-#    main()
+            instId = r.instance.instance_id
+            ipaddr = r.pri_addr
+            hrn = r.instance.slice.slice_hrn
+            logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
+            f.write("%s %s %s\n" % (instId, ipaddr, hrn))
+        f.close()
+    
+    def GetVersion(api):
+        xrn=Xrn(api.hrn)
+        request_rspec_versions = [dict(sfa_rspec_version)]
+        ad_rspec_versions = [dict(sfa_rspec_version)]
+        version_more = {'interface':'aggregate',
+                        'testbed':'myplc',
+                        'hrn':xrn.get_hrn(),
+                        'request_rspec_versions': request_rspec_versions,
+                        'ad_rspec_versions': ad_rspec_versions,
+                        'default_ad_rspec': dict(sfa_rspec_version)
+                        }
+        return version_core(version_more)