removed imports on sfa.util.rspec that is gone
[sfa.git] / sfa / managers / aggregate_manager_eucalyptus.py
index 09715cc..6c7c1f4 100644 (file)
@@ -1,7 +1,9 @@
 from __future__ import with_statement 
 
 import sys
-import os
+import os, errno
+import logging
+import datetime
 
 import boto
 from boto.ec2.regioninfo import RegionInfo
@@ -12,12 +14,20 @@ from lxml import etree as ET
 from sqlobject import *
 
 from sfa.util.faults import *
-from sfa.util.xrn import urn_to_hrn
-from sfa.util.rspec import RSpec
+from sfa.util.xrn import urn_to_hrn, Xrn
 from sfa.server.registry import Registries
 from sfa.trust.credential import Credential
 from sfa.plc.api import SfaAPI
+from sfa.plc.aggregate import Aggregate
+from sfa.plc.slices import *
 from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn
+from sfa.util.callids import Callids
+from sfa.util.sfalogging import logger
+from sfa.rspecs.sfa_rspec import sfa_rspec_version
+from sfa.util.version import version_core
+
+from multiprocessing import Process
+from time import sleep
 
 ##
 # The data structure used to represent a cloud.
@@ -31,10 +41,18 @@ cloud = {}
 #
 EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
 
-# Quick hack
-sys.stderr = file('/var/log/euca_agg.log', 'a+')
 api = SfaAPI()
 
+##
+# Meta data of an instance.
+#
+class Meta(SQLObject):
+    instance   = SingleJoin('EucaInstance')
+    state      = StringCol(default = 'new')
+    pub_addr   = StringCol(default = None)
+    pri_addr   = StringCol(default = None)
+    start_time = DateTimeCol(default = None)
+
 ##
 # A representation of an Eucalyptus instance. This is a support class
 # for instance <-> slice mapping.
@@ -46,7 +64,8 @@ class EucaInstance(SQLObject):
     ramdisk_id  = StringCol()
     inst_type   = StringCol()
     key_pair    = StringCol()
-    slice = ForeignKey('Slice')
+    slice       = ForeignKey('Slice')
+    meta        = ForeignKey('Meta')
 
     ##
     # Contacts Eucalyptus and tries to reserve this instance.
@@ -55,10 +74,11 @@ class EucaInstance(SQLObject):
     # @param pubKeys A list of public keys for the instance.
     #
     def reserveInstance(self, botoConn, pubKeys):
-        print >>sys.stderr, 'Reserving an instance: image: %s, kernel: ' \
-                            '%s, ramdisk: %s, type: %s, key: %s' % \
-                            (self.image_id, self.kernel_id, self.ramdisk_id, 
-                             self.inst_type, self.key_pair)
+        logger = logging.getLogger('EucaAggregate')
+        logger.info('Reserving an instance: image: %s, kernel: ' \
+                    '%s, ramdisk: %s, type: %s, key: %s' % \
+                    (self.image_id, self.kernel_id, self.ramdisk_id,
+                    self.inst_type, self.key_pair))
 
         # XXX The return statement is for testing. REMOVE in production
         #return
@@ -77,7 +97,7 @@ class EucaInstance(SQLObject):
         except EC2ResponseError, ec2RespErr:
             errTree = ET.fromstring(ec2RespErr.body)
             msg = errTree.find('.//Message')
-            print >>sys.stderr, msg.text
+            logger.error(msg.text)
             self.destroySelf()
 
 ##
@@ -93,10 +113,17 @@ class Slice(SQLObject):
 # Initialize the aggregate manager by reading a configuration file.
 #
 def init_server():
+    logger = logging.getLogger('EucaAggregate')
+    fileHandler = logging.FileHandler('/var/log/euca.log')
+    fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
+    logger.addHandler(fileHandler)
+    fileHandler.setLevel(logging.DEBUG)
+    logger.setLevel(logging.DEBUG)
+
     configParser = ConfigParser()
     configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
     if len(configParser.sections()) < 1:
-        print >>sys.stderr, 'No cloud defined in the config file'
+        logger.error('No cloud defined in the config file')
         raise Exception('Cannot find cloud definition in configuration file.')
 
     # Only read the first section.
@@ -112,23 +139,38 @@ def init_server():
         cloudURL = cloudURL.replace('http://', '')
     (cloud['ip'], parts) = cloudURL.split(':')
 
-    # Initialize sqlite3 database.
+    # Create image bundles
+    images = getEucaConnection().get_all_images()
+    cloud['images'] = images
+    cloud['imageBundles'] = {}
+    for i in images:
+        if i.type != 'machine' or i.kernel_id is None: continue
+        name = os.path.dirname(i.location)
+        detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
+        cloud['imageBundles'][name] = detail
+
+    # Initialize sqlite3 database and tables.
     dbPath = '/etc/sfa/db'
     dbName = 'euca_aggregate.db'
 
     if not os.path.isdir(dbPath):
-        print >>sys.stderr, '%s not found. Creating directory ...' % dbPath
+        logger.info('%s not found. Creating directory ...' % dbPath)
         os.mkdir(dbPath)
 
     conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
     sqlhub.processConnection = conn
     Slice.createTable(ifNotExists=True)
     EucaInstance.createTable(ifNotExists=True)
+    Meta.createTable(ifNotExists=True)
+
+    # Start the update process to keep track of the meta data
+    # about Eucalyptus instance.
+    Process(target=updateMeta).start()
 
     # Make sure the schema exists.
     if not os.path.exists(EUCALYPTUS_RSPEC_SCHEMA):
         err = 'Cannot location schema at %s' % EUCALYPTUS_RSPEC_SCHEMA
-        print >>sys.stderr, err
+        logger.error(err)
         raise Exception(err)
 
 ##
@@ -145,10 +187,11 @@ def getEucaConnection():
     useSSL    = False
     srvPath   = '/'
     eucaPort  = 8773
+    logger    = logging.getLogger('EucaAggregate')
 
     if not accessKey or not secretKey or not eucaURL:
-        print >>sys.stderr, 'Please set ALL of the required environment ' \
-                            'variables by sourcing the eucarc file.'
+        logger.error('Please set ALL of the required environment ' \
+                     'variables by sourcing the eucarc file.')
         return None
     
     # Split the url into parts
@@ -177,36 +220,31 @@ def getEucaConnection():
 # @param sliceHRN The hunman readable name of the slice.
 # @return sting()
 #
-def getKeysForSlice(sliceHRN):
-    try:
-        # convert hrn to slice name
-        plSliceName = hrn_to_pl_slicename(sliceHRN)
-    except IndexError, e:
-        print >>sys.stderr, 'Invalid slice name (%s)' % sliceHRN
-        return []
-
-    # Get the slice's information
-    sliceData = api.plshell.GetSlices(api.plauth, {'name':plSliceName})
-    if not sliceData:
-        print >>sys.stderr, 'Cannot get any data for slice %s' % plSliceName
+# This method is no longer needed because the user keys are passed into
+# CreateSliver
+#
+def getKeysForSlice(api, sliceHRN):
+    logger   = logging.getLogger('EucaAggregate')
+    cred     = api.getCredential()
+    registry = api.registries[api.hrn]
+    keys     = []
+
+    # Get the slice record
+    records = registry.Resolve(sliceHRN, cred)
+    if not records:
+        logging.warn('Cannot find any record for slice %s' % sliceHRN)
         return []
 
-    # It should only return a list with len = 1
-    sliceData = sliceData[0]
+    # Find who can log into this slice
+    persons = records[0]['persons']
 
-    keys = []
-    person_ids = sliceData['person_ids']
-    if not person_ids: 
-        print >>sys.stderr, 'No users in slice %s' % sliceHRN
-        return []
+    # Extract the keys from persons records
+    for p in persons:
+        sliceUser = registry.Resolve(p, cred)
+        userKeys = sliceUser[0]['keys']
+        keys += userKeys
 
-    persons = api.plshell.GetPersons(api.plauth, person_ids)
-    for person in persons:
-        pkeys = api.plshell.GetKeys(api.plauth, person['key_ids'])
-        for key in pkeys:
-            keys.append(key['key'])
-    return ''.join(keys)
+    return '\n'.join(keys)
 
 ##
 # A class that builds the RSpec for Eucalyptus.
@@ -275,8 +313,6 @@ class EucaRSpecBuilder(object):
                                 xml << str(inst[4])
                             with xml.disk_space(unit='GB'):
                                 xml << str(inst[5])
-                            if inst[0] == 'm1.small':
-                                self.__requestXML(1, 'emi-88760F45', 'eki-F26610C6', 'cortex')
                             if 'instances' in cloud and inst[0] in cloud['instances']:
                                 existingEucaInstances = cloud['instances'][inst[0]]
                                 with xml.euca_instances:
@@ -286,8 +322,13 @@ class EucaRSpecBuilder(object):
                                                 xml << eucaInst['state']
                                             with xml.public_dns:
                                                 xml << eucaInst['public_dns']
-                                            with xml.keypair:
-                                                xml << eucaInst['key']
+
+    def __imageBundleXML(self, bundles):
+        xml = self.eucaRSpec
+        with xml.bundles:
+            for bundle in bundles.keys():
+                with xml.bundle(id=bundle):
+                    xml << ''
 
     ##
     # Creates the Images stanza.
@@ -324,18 +365,20 @@ class EucaRSpecBuilder(object):
     # Generates the RSpec.
     #
     def toXML(self):
+        logger = logging.getLogger('EucaAggregate')
         if not self.cloudInfo:
-            print >>sys.stderr, 'No cloud information'
+            logger.error('No cloud information')
             return ''
 
         xml = self.eucaRSpec
         cloud = self.cloudInfo
         with xml.RSpec(type='eucalyptus'):
-            with xml.cloud(id=cloud['name']):
+            with xml.network(name=cloud['name']):
                 with xml.ipv4:
                     xml << cloud['ip']
-                self.__keyPairsXML(cloud['keypairs'])
-                self.__imagesXML(cloud['images'])
+                #self.__keyPairsXML(cloud['keypairs'])
+                #self.__imagesXML(cloud['images'])
+                self.__imageBundleXML(cloud['imageBundles'])
                 self.__clustersXML(cloud['clusters'])
         return str(xml)
 
@@ -379,11 +422,13 @@ class ZoneResultParser(object):
 
         return clusterList
 
-def get_rspec(api, creds, options): 
+def ListResources(api, creds, options, call_id): 
+    if Callids().already_handled(call_id): return ""
     global cloud
     # get slice's hrn from options
     xrn = options.get('geni_slice_urn', '')
     hrn, type = urn_to_hrn(xrn)
+    logger = logging.getLogger('EucaAggregate')
 
     # get hrn of the original caller
     origin_hrn = options.get('origin_hrn', None)
@@ -393,7 +438,7 @@ def get_rspec(api, creds, options):
     conn = getEucaConnection()
 
     if not conn:
-        print >>sys.stderr, 'Error: Cannot create a connection to Eucalyptus'
+        logger.error('Cannot create a connection to Eucalyptus')
         return 'Cannot create a connection to Eucalyptus'
 
     try:
@@ -406,6 +451,12 @@ def get_rspec(api, creds, options):
         # Images
         images = conn.get_all_images()
         cloud['images'] = images
+        cloud['imageBundles'] = {}
+        for i in images:
+            if i.type != 'machine' or i.kernel_id is None: continue
+            name = os.path.dirname(i.location)
+            detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
+            cloud['imageBundles'][name] = detail
 
         # Key Pairs
         keyPairs = conn.get_all_key_pairs()
@@ -437,7 +488,7 @@ def get_rspec(api, creds, options):
                 for instance in reservation.instances:
                     instances.append(instance)
 
-            # Construct a dictory for the EucaRSpecBuilder
+            # Construct a dictionary for the EucaRSpecBuilder
             instancesDict = {}
             for instance in instances:
                 instList = instancesDict.setdefault(instance.instance_type, [])
@@ -454,7 +505,7 @@ def get_rspec(api, creds, options):
     except EC2ResponseError, ec2RespErr:
         errTree = ET.fromstring(ec2RespErr.body)
         errMsgE = errTree.find('.//Message')
-        print >>sys.stderr, errMsgE.text
+        logger.error(errMsgE.text)
 
     rspec = EucaRSpecBuilder(cloud).toXML()
 
@@ -468,25 +519,52 @@ def get_rspec(api, creds, options):
 """
 Hook called via 'sfi.py create'
 """
-def create_slice(api, xrn, creds, xml, users):
+def CreateSliver(api, slice_xrn, creds, xml, users, call_id):
+    if Callids().already_handled(call_id): return ""
+
     global cloud
-    hrn = urn_to_hrn(xrn)[0]
+    logger = logging.getLogger('EucaAggregate')
+    logger.debug("In CreateSliver")
+
+    aggregate = Aggregate(api)
+    slices = Slices(api)
+    (hrn, type) = urn_to_hrn(slice_xrn)
+    peer = slices.get_peer(hrn)
+    sfa_peer = slices.get_sfa_peer(hrn)
+    slice_record=None
+    if users:
+        slice_record = users[0].get('slice_record', {})
 
     conn = getEucaConnection()
     if not conn:
-        print >>sys.stderr, 'Error: Cannot create a connection to Eucalyptus'
-        return False
+        logger.error('Cannot create a connection to Eucalyptus')
+        return ""
 
     # Validate RSpec
     schemaXML = ET.parse(EUCALYPTUS_RSPEC_SCHEMA)
     rspecValidator = ET.RelaxNG(schemaXML)
     rspecXML = ET.XML(xml)
+    for network in rspecXML.iterfind("./network"):
+        if network.get('name') != cloud['name']:
+            # Throw away everything except my own RSpec
+            # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
+            network.getparent().remove(network)
     if not rspecValidator(rspecXML):
         error = rspecValidator.error_log.last_error
         message = '%s (line %s)' % (error.message, error.line) 
-        # XXX: InvalidRSpec is new. Currently, I am not working with Trunk code.
-        #raise InvalidRSpec(message)
-        raise Exception(message)
+        raise InvalidRSpec(message)
+
+    """
+    Create the sliver[s] (slice) at this aggregate.
+    Verify HRN and initialize the slice record in PLC if necessary.
+    """
+
+    # ensure site record exists
+    site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
+    # ensure slice record exists
+    slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
+    # ensure person records exists
+    persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
 
     # Get the slice from db or create one.
     s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
@@ -497,49 +575,152 @@ def create_slice(api, xrn, creds, xml, users):
     pendingRmInst = []
     for sliceInst in s.instances:
         pendingRmInst.append(sliceInst.instance_id)
-    existingInstGroup = rspecXML.findall('.//euca_instances')
+    existingInstGroup = rspecXML.findall(".//euca_instances")
     for instGroup in existingInstGroup:
         for existingInst in instGroup:
             if existingInst.get('id') in pendingRmInst:
                 pendingRmInst.remove(existingInst.get('id'))
     for inst in pendingRmInst:
-        print >>sys.stderr, 'Instance %s will be terminated' % inst
         dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
-        dbInst.destroySelf()
-    conn.terminate_instances(pendingRmInst)
+        if dbInst.meta.state != 'deleted':
+            logger.debug('Instance %s will be terminated' % inst)
+            # Terminate instances one at a time for robustness
+            conn.terminate_instances([inst])
+            # Only change the state but do not remove the entry from the DB.
+            dbInst.meta.state = 'deleted'
+            #dbInst.destroySelf()
 
     # Process new instance requests
-    requests = rspecXML.findall('.//request')
+    requests = rspecXML.findall(".//request")
     if requests:
         # Get all the public keys associate with slice.
-        pubKeys = getKeysForSlice(s.slice_hrn)
-        print sys.stderr, "Passing the following keys to the instance:\n%s" % pubKeys
+        keys = []
+        for user in users:
+            keys += user['keys']
+            logger.debug("Keys: %s" % user['keys'])
+        pubKeys = '\n'.join(keys)
+        logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
     for req in requests:
         vmTypeElement = req.getparent()
         instType = vmTypeElement.get('name')
         numInst  = int(req.find('instances').text)
-        instKernel  = req.find('kernel_image').get('id')
-        instDiskImg = req.find('disk_image').get('id')
-        instKey     = req.find('keypair').text
         
-        ramDiskElement = req.find('ramdisk')
-        ramDiskAttr    = ramDiskElement.attrib
-        if 'id' in ramDiskAttr:
-            instRamDisk = ramDiskAttr['id']
-        else:
-            instRamDisk = None
+        bundleName = req.find('bundle').text
+        if not cloud['imageBundles'][bundleName]:
+            logger.error('Cannot find bundle %s' % bundleName)
+        bundleInfo = cloud['imageBundles'][bundleName]
+        instKernel  = bundleInfo['kernelID']
+        instDiskImg = bundleInfo['imageID']
+        instRamDisk = bundleInfo['ramdiskID']
+        instKey     = None
 
         # Create the instances
         for i in range(0, numInst):
-            eucaInst = EucaInstance(slice = s, 
-                                    kernel_id = instKernel,
-                                    image_id = instDiskImg,
+            eucaInst = EucaInstance(slice      = s,
+                                    kernel_id  = instKernel,
+                                    image_id   = instDiskImg,
                                     ramdisk_id = instRamDisk,
-                                    key_pair = instKey,
-                                    inst_type = instType)
+                                    key_pair   = instKey,
+                                    inst_type  = instType,
+                                    meta       = Meta(start_time=datetime.datetime.now()))
             eucaInst.reserveInstance(conn, pubKeys)
 
-    return True
+    # xxx - should return altered rspec 
+    # with enough data for the client to understand what's happened
+    return xml
+
+##
+# Return information on the IP addresses bound to each slice's instances
+#
+def dumpInstanceInfo():
+    logger = logging.getLogger('EucaMeta')
+    outdir = "/var/www/html/euca/"
+    outfile = outdir + "instances.txt"
+
+    try:
+        os.makedirs(outdir)
+    except OSError, e:
+        if e.errno != errno.EEXIST:
+            raise
+
+    dbResults = Meta.select(
+        AND(Meta.q.pri_addr != None,
+            Meta.q.state    == 'running')
+        )
+    dbResults = list(dbResults)
+    f = open(outfile, "w")
+    for r in dbResults:
+        instId = r.instance.instance_id
+        ipaddr = r.pri_addr
+        hrn = r.instance.slice.slice_hrn
+        logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
+        f.write("%s %s %s\n" % (instId, ipaddr, hrn))
+    f.close()
+
+##
+# A separate process that will update the meta data.
+#
+def updateMeta():
+    logger = logging.getLogger('EucaMeta')
+    fileHandler = logging.FileHandler('/var/log/euca_meta.log')
+    fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
+    logger.addHandler(fileHandler)
+    fileHandler.setLevel(logging.DEBUG)
+    logger.setLevel(logging.DEBUG)
+
+    while True:
+        sleep(30)
+
+        # Get IDs of the instances that don't have IPs yet.
+        dbResults = Meta.select(
+                      AND(Meta.q.pri_addr == None,
+                          Meta.q.state    != 'deleted')
+                    )
+        dbResults = list(dbResults)
+        logger.debug('[update process] dbResults: %s' % dbResults)
+        instids = []
+        for r in dbResults:
+            if not r.instance:
+                continue
+            instids.append(r.instance.instance_id)
+        logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
+
+        # Get instance information from Eucalyptus
+        conn = getEucaConnection()
+        vmInstances = []
+        reservations = conn.get_all_instances(instids)
+        for reservation in reservations:
+            vmInstances += reservation.instances
+
+        # Check the IPs
+        instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
+                    for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
+        logger.debug('[update process] IP dict: %s' % str(instIPs))
+
+        # Update the local DB
+        for ipData in instIPs:
+            dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
+            if not dbInst:
+                logger.info('[update process] Could not find %s in DB' % ipData['id'])
+                continue
+            dbInst.meta.pri_addr = ipData['pri_addr']
+            dbInst.meta.pub_addr = ipData['pub_addr']
+            dbInst.meta.state    = 'running'
+
+        dumpInstanceInfo()
+
+def GetVersion(api):
+    xrn=Xrn(api.hrn)
+    request_rspec_versions = [dict(sfa_rspec_version)]
+    ad_rspec_versions = [dict(sfa_rspec_version)]
+    version_more = {'interface':'aggregate',
+                    'testbed':'myplc',
+                    'hrn':xrn.get_hrn(),
+                    'request_rspec_versions': request_rspec_versions,
+                    'ad_rspec_versions': ad_rspec_versions,
+                    'default_ad_rspec': dict(sfa_rspec_version)
+                    }
+    return version_core(version_more)
 
 def main():
     init_server()
@@ -547,11 +728,15 @@ def main():
     #theRSpec = None
     #with open(sys.argv[1]) as xml:
     #    theRSpec = xml.read()
-    #create_slice(None, 'planetcloud.pc.test', theRSpec)
+    #CreateSliver(None, 'planetcloud.pc.test', theRSpec, 'call-id-cloudtest')
 
-    #rspec = get_rspec('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy')
+    #rspec = ListResources('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy', 'test_euca')
     #print rspec
-    print getKeysForSlice('gc.gc.test1')
+
+    server_key_file = '/var/lib/sfa/authorities/server.key'
+    server_cert_file = '/var/lib/sfa/authorities/server.cert'
+    api = SfaAPI(key_file = server_key_file, cert_file = server_cert_file, interface='aggregate')
+    print getKeysForSlice(api, 'gc.gc.test1')
 
 if __name__ == "__main__":
     main()