GENICLOUD-25
[sfa.git] / sfa / managers / aggregate_manager_eucalyptus.py
index 7b50d2a..26e5742 100644 (file)
@@ -2,6 +2,8 @@ from __future__ import with_statement
 
 import sys
 import os
+import logging
+import datetime
 
 import boto
 from boto.ec2.regioninfo import RegionInfo
@@ -12,9 +14,16 @@ from lxml import etree as ET
 from sqlobject import *
 
 from sfa.util.faults import *
-from sfa.util.namespace import urn_to_hrn
+from sfa.util.xrn import urn_to_hrn
 from sfa.util.rspec import RSpec
 from sfa.server.registry import Registries
+from sfa.trust.credential import Credential
+from sfa.plc.api import SfaAPI
+from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn
+from sfa.util.callids import Callids
+
+from threading import Thread
+from time import sleep
 
 ##
 # The data structure used to represent a cloud.
@@ -28,6 +37,18 @@ cloud = {}
 #
 EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
 
+api = SfaAPI()
+
+##
+# Meta data of an instance.
+#
+class Meta(SQLObject):
+    instance   = SingleJoin('EucaInstance')
+    state      = StringCol(default = 'new')
+    pub_addr   = StringCol(default = None)
+    pri_addr   = StringCol(default = None)
+    start_time = DateTimeCol(default = None)
+
 ##
 # A representation of an Eucalyptus instance. This is a support class
 # for instance <-> slice mapping.
@@ -39,18 +60,21 @@ class EucaInstance(SQLObject):
     ramdisk_id  = StringCol()
     inst_type   = StringCol()
     key_pair    = StringCol()
-    slice = ForeignKey('Slice')
+    slice       = ForeignKey('Slice')
+    meta        = ForeignKey('Meta')
 
     ##
     # Contacts Eucalyptus and tries to reserve this instance.
     # 
     # @param botoConn A connection to Eucalyptus.
+    # @param pubKeys A list of public keys for the instance.
     #
-    def reserveInstance(self, botoConn):
-        print >>sys.stderr, 'Reserving an instance: image: %s, kernel: ' \
-                            '%s, ramdisk: %s, type: %s, key: %s' % \
-                            (self.image_id, self.kernel_id, self.ramdisk_id, 
-                             self.inst_type, self.key_pair)
+    def reserveInstance(self, botoConn, pubKeys):
+        logger = logging.getLogger('EucaAggregate')
+        logger.info('Reserving an instance: image: %s, kernel: ' \
+                    '%s, ramdisk: %s, type: %s, key: %s' % \
+                    (self.image_id, self.kernel_id, self.ramdisk_id,
+                    self.inst_type, self.key_pair))
 
         # XXX The return statement is for testing. REMOVE in production
         #return
@@ -60,7 +84,8 @@ class EucaInstance(SQLObject):
                                                  kernel_id = self.kernel_id,
                                                  ramdisk_id = self.ramdisk_id,
                                                  instance_type = self.inst_type,
-                                                 key_name  = self.key_pair)
+                                                 key_name  = self.key_pair,
+                                                 user_data = pubKeys)
             for instance in reservation.instances:
                 self.instance_id = instance.id
 
@@ -68,7 +93,7 @@ class EucaInstance(SQLObject):
         except EC2ResponseError, ec2RespErr:
             errTree = ET.fromstring(ec2RespErr.body)
             msg = errTree.find('.//Message')
-            print >>sys.stderr, msg.text
+            logger.error(msg.text)
             self.destroySelf()
 
 ##
@@ -84,10 +109,16 @@ class Slice(SQLObject):
 # Initialize the aggregate manager by reading a configuration file.
 #
 def init_server():
+    logger = logging.getLogger('EucaAggregate')
+    fileHandler = logging.FileHandler('/var/log/euca.log')
+    fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
+    logger.addHandler(fileHandler)
+    logger.setLevel(logging.DEBUG)
+
     configParser = ConfigParser()
     configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
     if len(configParser.sections()) < 1:
-        print >>sys.stderr, 'No cloud defined in the config file'
+        logger.error('No cloud defined in the config file')
         raise Exception('Cannot find cloud definition in configuration file.')
 
     # Only read the first section.
@@ -103,23 +134,38 @@ def init_server():
         cloudURL = cloudURL.replace('http://', '')
     (cloud['ip'], parts) = cloudURL.split(':')
 
-    # Initialize sqlite3 database.
+    # Create image bundles
+    images = getEucaConnection().get_all_images()
+    cloud['images'] = images
+    cloud['imageBundles'] = {}
+    for i in images:
+        if i.type != 'machine' or i.kernel_id is None: continue
+        name = os.path.dirname(i.location)
+        detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
+        cloud['imageBundles'][name] = detail
+
+    # Initialize sqlite3 database and tables.
     dbPath = '/etc/sfa/db'
     dbName = 'euca_aggregate.db'
 
     if not os.path.isdir(dbPath):
-        print >>sys.stderr, '%s not found. Creating directory ...' % dbPath
+        logger.info('%s not found. Creating directory ...' % dbPath)
         os.mkdir(dbPath)
 
     conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
     sqlhub.processConnection = conn
     Slice.createTable(ifNotExists=True)
     EucaInstance.createTable(ifNotExists=True)
+    IP.createTable(ifNotExists=True)
+
+    # Start the update thread to keep track of the meta data
+    # about Eucalyptus instance.
+    Thread(target=updateMeta).start()
 
     # Make sure the schema exists.
     if not os.path.exists(EUCALYPTUS_RSPEC_SCHEMA):
         err = 'Cannot location schema at %s' % EUCALYPTUS_RSPEC_SCHEMA
-        print >>sys.stderr, err
+        logger.error(err)
         raise Exception(err)
 
 ##
@@ -136,10 +182,11 @@ def getEucaConnection():
     useSSL    = False
     srvPath   = '/'
     eucaPort  = 8773
+    logger    = logging.getLogger('EucaAggregate')
 
     if not accessKey or not secretKey or not eucaURL:
-        print >>sys.stderr, 'Please set ALL of the required environment ' \
-                            'variables by sourcing the eucarc file.'
+        logger.error('Please set ALL of the required environment ' \
+                     'variables by sourcing the eucarc file.')
         return None
     
     # Split the url into parts
@@ -163,6 +210,43 @@ def getEucaConnection():
                             port=eucaPort,
                             path=srvPath)
 
+##
+# Returns a string of keys that belong to the users of the given slice.
+# @param sliceHRN The hunman readable name of the slice.
+# @return sting()
+#
+def getKeysForSlice(sliceHRN):
+    logger = logging.getLogger('EucaAggregate')
+    try:
+        # convert hrn to slice name
+        plSliceName = hrn_to_pl_slicename(sliceHRN)
+    except IndexError, e:
+        logger.error('Invalid slice name (%s)' % sliceHRN)
+        return []
+
+    # Get the slice's information
+    sliceData = api.plshell.GetSlices(api.plauth, {'name':plSliceName})
+    if not sliceData:
+        logger.warn('Cannot get any data for slice %s' % plSliceName)
+        return []
+
+    # It should only return a list with len = 1
+    sliceData = sliceData[0]
+
+    keys = []
+    person_ids = sliceData['person_ids']
+    if not person_ids: 
+        logger.warn('No users in slice %s' % sliceHRN)
+        return []
+
+    persons = api.plshell.GetPersons(api.plauth, person_ids)
+    for person in persons:
+        pkeys = api.plshell.GetKeys(api.plauth, person['key_ids'])
+        for key in pkeys:
+            keys.append(key['key'])
+    return ''.join(keys)
+
 ##
 # A class that builds the RSpec for Eucalyptus.
 #
@@ -230,8 +314,6 @@ class EucaRSpecBuilder(object):
                                 xml << str(inst[4])
                             with xml.disk_space(unit='GB'):
                                 xml << str(inst[5])
-                            if inst[0] == 'm1.small':
-                                self.__requestXML(1, 'emi-88760F45', 'eki-F26610C6', 'cortex')
                             if 'instances' in cloud and inst[0] in cloud['instances']:
                                 existingEucaInstances = cloud['instances'][inst[0]]
                                 with xml.euca_instances:
@@ -241,8 +323,13 @@ class EucaRSpecBuilder(object):
                                                 xml << eucaInst['state']
                                             with xml.public_dns:
                                                 xml << eucaInst['public_dns']
-                                            with xml.keypair:
-                                                xml << eucaInst['key']
+
+    def __imageBundleXML(self, bundles):
+        xml = self.eucaRSpec
+        with xml.bundles:
+            for bundle in bundles.keys():
+                with xml.bundle(id=bundle):
+                    xml << ''
 
     ##
     # Creates the Images stanza.
@@ -279,8 +366,9 @@ class EucaRSpecBuilder(object):
     # Generates the RSpec.
     #
     def toXML(self):
+        logger = logging.getLogger('EucaAggregate')
         if not self.cloudInfo:
-            print >>sys.stderr, 'No cloud information'
+            logger.error('No cloud information')
             return ''
 
         xml = self.eucaRSpec
@@ -289,8 +377,9 @@ class EucaRSpecBuilder(object):
             with xml.cloud(id=cloud['name']):
                 with xml.ipv4:
                     xml << cloud['ip']
-                self.__keyPairsXML(cloud['keypairs'])
-                self.__imagesXML(cloud['images'])
+                #self.__keyPairsXML(cloud['keypairs'])
+                #self.__imagesXML(cloud['images'])
+                self.__imageBundleXML(cloud['imageBundles'])
                 self.__clustersXML(cloud['clusters'])
         return str(xml)
 
@@ -334,11 +423,13 @@ class ZoneResultParser(object):
 
         return clusterList
 
-def get_rspec(api, creds, options): 
+def ListResources(api, creds, options, call_id): 
+    if Callids().already_handled(call_id): return ""
     global cloud
     # get slice's hrn from options
-    xrn = options.get('geni_slice_urn', None)
+    xrn = options.get('geni_slice_urn', '')
     hrn, type = urn_to_hrn(xrn)
+    logger = logging.getLogger('EucaAggregate')
 
     # get hrn of the original caller
     origin_hrn = options.get('origin_hrn', None)
@@ -348,7 +439,7 @@ def get_rspec(api, creds, options):
     conn = getEucaConnection()
 
     if not conn:
-        print >>sys.stderr, 'Error: Cannot create a connection to Eucalyptus'
+        logger.error('Cannot create a connection to Eucalyptus')
         return 'Cannot create a connection to Eucalyptus'
 
     try:
@@ -361,6 +452,12 @@ def get_rspec(api, creds, options):
         # Images
         images = conn.get_all_images()
         cloud['images'] = images
+        cloud['imageBundles'] = {}
+        for i in images:
+            if i.type != 'machine' or i.kernel_id is None: continue
+            name = os.path.dirname(i.location)
+            detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
+            cloud['imageBundles'][name] = detail
 
         # Key Pairs
         keyPairs = conn.get_all_key_pairs()
@@ -392,7 +489,7 @@ def get_rspec(api, creds, options):
                 for instance in reservation.instances:
                     instances.append(instance)
 
-            # Construct a dictory for the EucaRSpecBuilder
+            # Construct a dictionary for the EucaRSpecBuilder
             instancesDict = {}
             for instance in instances:
                 instList = instancesDict.setdefault(instance.instance_type, [])
@@ -409,7 +506,7 @@ def get_rspec(api, creds, options):
     except EC2ResponseError, ec2RespErr:
         errTree = ET.fromstring(ec2RespErr.body)
         errMsgE = errTree.find('.//Message')
-        print >>sys.stderr, errMsgE.text
+        logger.error(errMsgE.text)
 
     rspec = EucaRSpecBuilder(cloud).toXML()
 
@@ -423,14 +520,17 @@ def get_rspec(api, creds, options):
 """
 Hook called via 'sfi.py create'
 """
-def create_slice(api, xrn, creds, xml, users):
+def CreateSliver(api, xrn, creds, xml, users, call_id):
+    if Callids().already_handled(call_id): return ""
+
     global cloud
     hrn = urn_to_hrn(xrn)[0]
+    logger = logging.getLogger('EucaAggregate')
 
     conn = getEucaConnection()
     if not conn:
-        print >>sys.stderr, 'Error: Cannot create a connection to Eucalyptus'
-        return False
+        logger.error('Cannot create a connection to Eucalyptus')
+        return ""
 
     # Validate RSpec
     schemaXML = ET.parse(EUCALYPTUS_RSPEC_SCHEMA)
@@ -458,50 +558,101 @@ def create_slice(api, xrn, creds, xml, users):
             if existingInst.get('id') in pendingRmInst:
                 pendingRmInst.remove(existingInst.get('id'))
     for inst in pendingRmInst:
-        print >>sys.stderr, 'Instance %s will be terminated' % inst
+        logger.debug('Instance %s will be terminated' % inst)
         dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
-        dbInst.destroySelf()
+        # Only change the state but do not remove the entry from the DB.
+        dbInst.meta.state = 'deleted'
+        #dbInst.destroySelf()
     conn.terminate_instances(pendingRmInst)
 
     # Process new instance requests
     requests = rspecXML.findall('.//request')
+    if requests:
+        # Get all the public keys associate with slice.
+        pubKeys = getKeysForSlice(s.slice_hrn)
+        logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
     for req in requests:
         vmTypeElement = req.getparent()
         instType = vmTypeElement.get('name')
         numInst  = int(req.find('instances').text)
-        instKernel  = req.find('kernel_image').get('id')
-        instDiskImg = req.find('disk_image').get('id')
-        instKey     = req.find('keypair').text
         
-        ramDiskElement = req.find('ramdisk')
-        ramDiskAttr    = ramDiskElement.attrib
-        if 'id' in ramDiskAttr:
-            instRamDisk = ramDiskAttr['id']
-        else:
-            instRamDisk = None
+        bundleName = req.find('bundle').text
+        if not cloud['imageBundles'][bundleName]:
+            logger.error('Cannot find bundle %s' % bundleName)
+        bundleInfo = cloud['imageBundles'][bundleName]
+        instKernel  = bundleInfo['kernelID']
+        instDiskImg = bundleInfo['imageID']
+        instRamDisk = bundleInfo['ramdiskID']
+        instKey     = None
 
         # Create the instances
         for i in range(0, numInst):
-            eucaInst = EucaInstance(slice = s, 
-                                    kernel_id = instKernel,
-                                    image_id = instDiskImg,
+            eucaInst = EucaInstance(slice      = s,
+                                    kernel_id  = instKernel,
+                                    image_id   = instDiskImg,
                                     ramdisk_id = instRamDisk,
-                                    key_pair = instKey,
-                                    inst_type = instType)
-            eucaInst.reserveInstance(conn)
+                                    key_pair   = instKey,
+                                    inst_type  = instType,
+                                    meta       = Meta(start_time=datetime.datetime.now()))
+            eucaInst.reserveInstance(conn, pubKeys)
+
+    # xxx - should return altered rspec 
+    # with enough data for the client to understand what's happened
+    return xml
 
-    return True
+##
+# A thread that will update the meta data.
+#
+def updateMeta():
+    logger = logging.getLogger('EucaAggregate')
+    while True:
+        sleep(120)
+
+        # Get IDs of the instances that don't have IPs yet.
+        dbResults = Meta.select(
+                      AND(Meta.q.pri_addr == None,
+                          Meta.q.state    != 'deleted')
+                    )
+        dbResults = list(dbResults)
+        logger.debug('[update thread] dbResults: %s' % dbResults)
+        instids = []
+        for r in dbResults:
+            instids.append(r.instance.instance_id)
+        logger.debug('[update thread] Instance Id: %s' % ', '.join(instids))
+
+        # Get instance information from Eucalyptus
+        conn = getEucaConnection()
+        vmInstances = []
+        reservations = conn.get_all_instances(instids)
+        for reservation in reservations:
+            vmInstances += reservation.instances
+
+        # Check the IPs
+        instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
+                    for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
+        logger.debug('[update thread] IP dict: %s' % str(instIPs))
+
+        # Update the local DB
+        for ipData in instIPs:
+            dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
+            if not dbInst:
+                logger.info('[update thread] Could not find %s in DB' % ipData['id'])
+                continue
+            dbInst.meta.pri_addr = ipData['pri_addr']
+            dbInst.meta.pub_addr = ipData['pub_addr']
+            dbInst.meta.state    = 'running'
 
 def main():
     init_server()
 
-    theRSpec = None
-    with open(sys.argv[1]) as xml:
-        theRSpec = xml.read()
-    create_slice(None, 'planetcloud.pc.test', theRSpec)
+    #theRSpec = None
+    #with open(sys.argv[1]) as xml:
+    #    theRSpec = xml.read()
+    #CreateSliver(None, 'planetcloud.pc.test', theRSpec, 'call-id-cloudtest')
 
-    #rspec = get_rspec('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy')
+    #rspec = ListResources('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy', 'test_euca')
     #print rspec
+    print getKeysForSlice('gc.gc.test1')
 
 if __name__ == "__main__":
     main()