1 from __future__ import with_statement
7 from multiprocessing import Process
11 from boto.ec2.regioninfo import RegionInfo
12 from boto.exception import EC2ResponseError
13 from ConfigParser import ConfigParser
14 from xmlbuilder import XMLBuilder
15 from lxml import etree as ET
16 from sqlobject import *
18 from sfa.util.faults import *
19 from sfa.util.xrn import urn_to_hrn, Xrn
20 from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn
21 from sfa.util.callids import Callids
22 from sfa.util.sfalogging import logger
23 from sfa.util.version import version_core
25 from sfa.trust.credential import Credential
27 from sfa.server.sfaapi import SfaApi
29 from sfa.plc.aggregate import Aggregate
30 from sfa.plc.slices import *
31 from sfa.rspecs.sfa_rspec import sfa_rspec_version
35 # The data structure used to represent a cloud.
36 # It contains the cloud name, its ip address, image information,
37 # key pairs, and clusters information.
42 # The location of the RelaxNG schema.
44 EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
47 # Meta data of an instance.
49 class Meta(SQLObject):
50 instance = SingleJoin('EucaInstance')
51 state = StringCol(default = 'new')
52 pub_addr = StringCol(default = None)
53 pri_addr = StringCol(default = None)
54 start_time = DateTimeCol(default = None)
57 # A representation of an Eucalyptus instance. This is a support class
58 # for instance <-> slice mapping.
60 class EucaInstance(SQLObject):
61 instance_id = StringCol(unique=True, default=None)
62 kernel_id = StringCol()
63 image_id = StringCol()
64 ramdisk_id = StringCol()
65 inst_type = StringCol()
66 key_pair = StringCol()
67 slice = ForeignKey('Slice')
68 meta = ForeignKey('Meta')
71 # Contacts Eucalyptus and tries to reserve this instance.
73 # @param botoConn A connection to Eucalyptus.
74 # @param pubKeys A list of public keys for the instance.
76 def reserveInstance(self, botoConn, pubKeys):
77 logger = logging.getLogger('EucaAggregate')
78 logger.info('Reserving an instance: image: %s, kernel: ' \
79 '%s, ramdisk: %s, type: %s, key: %s' % \
80 (self.image_id, self.kernel_id, self.ramdisk_id,
81 self.inst_type, self.key_pair))
83 # XXX The return statement is for testing. REMOVE in production
87 reservation = botoConn.run_instances(self.image_id,
88 kernel_id = self.kernel_id,
89 ramdisk_id = self.ramdisk_id,
90 instance_type = self.inst_type,
91 key_name = self.key_pair,
93 for instance in reservation.instances:
94 self.instance_id = instance.id
96 # If there is an error, destroy itself.
97 except EC2ResponseError, ec2RespErr:
98 errTree = ET.fromstring(ec2RespErr.body)
99 msg = errTree.find('.//Message')
100 logger.error(msg.text)
104 # A representation of a PlanetLab slice. This is a support class
105 # for instance <-> slice mapping.
107 class Slice(SQLObject):
108 slice_hrn = StringCol()
109 #slice_index = DatabaseIndex('slice_hrn')
110 instances = MultipleJoin('EucaInstance')
113 # Initialize the aggregate manager by reading a configuration file.
116 logger = logging.getLogger('EucaAggregate')
117 fileHandler = logging.FileHandler('/var/log/euca.log')
118 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
119 logger.addHandler(fileHandler)
120 fileHandler.setLevel(logging.DEBUG)
121 logger.setLevel(logging.DEBUG)
123 configParser = ConfigParser()
124 configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
125 if len(configParser.sections()) < 1:
126 logger.error('No cloud defined in the config file')
127 raise Exception('Cannot find cloud definition in configuration file.')
129 # Only read the first section.
130 cloudSec = configParser.sections()[0]
131 cloud['name'] = cloudSec
132 cloud['access_key'] = configParser.get(cloudSec, 'access_key')
133 cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
134 cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url')
135 cloudURL = cloud['cloud_url']
136 if cloudURL.find('https://') >= 0:
137 cloudURL = cloudURL.replace('https://', '')
138 elif cloudURL.find('http://') >= 0:
139 cloudURL = cloudURL.replace('http://', '')
140 (cloud['ip'], parts) = cloudURL.split(':')
142 # Create image bundles
143 images = getEucaConnection().get_all_images()
144 cloud['images'] = images
145 cloud['imageBundles'] = {}
147 if i.type != 'machine' or i.kernel_id is None: continue
148 name = os.path.dirname(i.location)
149 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
150 cloud['imageBundles'][name] = detail
152 # Initialize sqlite3 database and tables.
153 dbPath = '/etc/sfa/db'
154 dbName = 'euca_aggregate.db'
156 if not os.path.isdir(dbPath):
157 logger.info('%s not found. Creating directory ...' % dbPath)
160 conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
161 sqlhub.processConnection = conn
162 Slice.createTable(ifNotExists=True)
163 EucaInstance.createTable(ifNotExists=True)
164 Meta.createTable(ifNotExists=True)
166 # Start the update process to keep track of the meta data
167 # about Eucalyptus instance.
168 Process(target=updateMeta).start()
170 # Make sure the schema exists.
171 if not os.path.exists(EUCALYPTUS_RSPEC_SCHEMA):
172 err = 'Cannot location schema at %s' % EUCALYPTUS_RSPEC_SCHEMA
177 # Creates a connection to Eucalytpus. This function is inspired by
178 # the make_connection() in Euca2ools.
180 # @return A connection object or None
182 def getEucaConnection():
184 accessKey = cloud['access_key']
185 secretKey = cloud['secret_key']
186 eucaURL = cloud['cloud_url']
190 logger = logging.getLogger('EucaAggregate')
192 if not accessKey or not secretKey or not eucaURL:
193 logger.error('Please set ALL of the required environment ' \
194 'variables by sourcing the eucarc file.')
197 # Split the url into parts
198 if eucaURL.find('https://') >= 0:
200 eucaURL = eucaURL.replace('https://', '')
201 elif eucaURL.find('http://') >= 0:
203 eucaURL = eucaURL.replace('http://', '')
204 (eucaHost, parts) = eucaURL.split(':')
206 parts = parts.split('/')
207 eucaPort = int(parts[0])
209 srvPath = '/'.join(parts)
211 return boto.connect_ec2(aws_access_key_id=accessKey,
212 aws_secret_access_key=secretKey,
214 region=RegionInfo(None, 'eucalyptus', eucaHost),
219 # Returns a string of keys that belong to the users of the given slice.
220 # @param sliceHRN The hunman readable name of the slice.
223 # This method is no longer needed because the user keys are passed into
226 def getKeysForSlice(api, sliceHRN):
227 logger = logging.getLogger('EucaAggregate')
228 cred = api.getCredential()
229 registry = api.registries[api.hrn]
232 # Get the slice record
233 records = registry.Resolve(sliceHRN, cred)
235 logging.warn('Cannot find any record for slice %s' % sliceHRN)
238 # Find who can log into this slice
239 persons = records[0]['persons']
241 # Extract the keys from persons records
243 sliceUser = registry.Resolve(p, cred)
244 userKeys = sliceUser[0]['keys']
247 return '\n'.join(keys)
250 # A class that builds the RSpec for Eucalyptus.
252 class EucaRSpecBuilder(object):
254 # Initizes a RSpec builder
256 # @param cloud A dictionary containing data about a
257 # cloud (ex. clusters, ip)
258 def __init__(self, cloud):
259 self.eucaRSpec = XMLBuilder(format = True, tab_step = " ")
260 self.cloudInfo = cloud
263 # Creates a request stanza.
265 # @param num The number of instances to create.
266 # @param image The disk image id.
267 # @param kernel The kernel image id.
268 # @param keypair Key pair to embed.
269 # @param ramdisk Ramdisk id (optional).
271 def __requestXML(self, num, image, kernel, keypair, ramdisk = ''):
276 with xml.kernel_image(id=kernel):
282 with xml.ramdisk(id=ramdisk):
284 with xml.disk_image(id=image):
290 # Creates the cluster stanza.
292 # @param clusters Clusters information.
294 def __clustersXML(self, clusters):
295 cloud = self.cloudInfo
298 for cluster in clusters:
299 instances = cluster['instances']
300 with xml.cluster(id=cluster['name']):
304 for inst in instances:
305 with xml.vm_type(name=inst[0]):
308 with xml.max_instances:
312 with xml.memory(unit='MB'):
314 with xml.disk_space(unit='GB'):
316 if 'instances' in cloud and inst[0] in cloud['instances']:
317 existingEucaInstances = cloud['instances'][inst[0]]
318 with xml.euca_instances:
319 for eucaInst in existingEucaInstances:
320 with xml.euca_instance(id=eucaInst['id']):
322 xml << eucaInst['state']
324 xml << eucaInst['public_dns']
326 def __imageBundleXML(self, bundles):
329 for bundle in bundles.keys():
330 with xml.bundle(id=bundle):
334 # Creates the Images stanza.
336 # @param images A list of images in Eucalyptus.
338 def __imagesXML(self, images):
342 with xml.image(id=image.id):
346 xml << image.architecture
350 xml << image.location
353 # Creates the KeyPairs stanza.
355 # @param keypairs A list of key pairs in Eucalyptus.
357 def __keyPairsXML(self, keypairs):
365 # Generates the RSpec.
368 logger = logging.getLogger('EucaAggregate')
369 if not self.cloudInfo:
370 logger.error('No cloud information')
374 cloud = self.cloudInfo
375 with xml.RSpec(type='eucalyptus'):
376 with xml.network(name=cloud['name']):
379 #self.__keyPairsXML(cloud['keypairs'])
380 #self.__imagesXML(cloud['images'])
381 self.__imageBundleXML(cloud['imageBundles'])
382 self.__clustersXML(cloud['clusters'])
386 # A parser to parse the output of availability-zones.
388 # Note: Only one cluster is supported. If more than one, this will
391 class ZoneResultParser(object):
392 def __init__(self, zones):
396 if len(self.zones) < 3:
402 cluster['name'] = self.zones[0].name
403 cluster['ip'] = self.zones[0].state
405 for i in range(2, len(self.zones)):
406 currZone = self.zones[i]
407 instType = currZone.name.split()[1]
409 stateString = currZone.state.split('/')
410 rscString = stateString[1].split()
412 instFree = int(stateString[0])
413 instMax = int(rscString[0])
414 instNumCpu = int(rscString[1])
415 instRam = int(rscString[2])
416 instDiskSpace = int(rscString[3])
418 instTuple = (instType, instFree, instMax, instNumCpu, instRam, instDiskSpace)
419 instList.append(instTuple)
420 cluster['instances'] = instList
421 clusterList.append(cluster)
425 def ListResources(api, creds, options, call_id):
426 if Callids().already_handled(call_id): return ""
428 # get slice's hrn from options
429 xrn = options.get('geni_slice_urn', '')
430 hrn, type = urn_to_hrn(xrn)
431 logger = logging.getLogger('EucaAggregate')
433 # get hrn of the original caller
434 origin_hrn = options.get('origin_hrn', None)
436 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
438 conn = getEucaConnection()
441 logger.error('Cannot create a connection to Eucalyptus')
442 return 'Cannot create a connection to Eucalyptus'
446 zones = conn.get_all_zones(['verbose'])
447 p = ZoneResultParser(zones)
449 cloud['clusters'] = clusters
452 images = conn.get_all_images()
453 cloud['images'] = images
454 cloud['imageBundles'] = {}
456 if i.type != 'machine' or i.kernel_id is None: continue
457 name = os.path.dirname(i.location)
458 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
459 cloud['imageBundles'][name] = detail
462 keyPairs = conn.get_all_key_pairs()
463 cloud['keypairs'] = keyPairs
469 # Get the instances that belong to the given slice from sqlite3
470 # XXX use getOne() in production because the slice's hrn is supposed
471 # to be unique. For testing, uniqueness is turned off in the db.
472 # If the slice isn't found in the database, create a record for the
474 matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
476 theSlice = matchedSlices[-1]
478 theSlice = Slice(slice_hrn = hrn)
479 for instance in theSlice.instances:
480 instanceId.append(instance.instance_id)
482 # Get the information about those instances using their ids.
483 if len(instanceId) > 0:
484 reservations = conn.get_all_instances(instanceId)
487 for reservation in reservations:
488 for instance in reservation.instances:
489 instances.append(instance)
491 # Construct a dictionary for the EucaRSpecBuilder
493 for instance in instances:
494 instList = instancesDict.setdefault(instance.instance_type, [])
497 instInfoDict['id'] = instance.id
498 instInfoDict['public_dns'] = instance.public_dns_name
499 instInfoDict['state'] = instance.state
500 instInfoDict['key'] = instance.key_name
502 instList.append(instInfoDict)
503 cloud['instances'] = instancesDict
505 except EC2ResponseError, ec2RespErr:
506 errTree = ET.fromstring(ec2RespErr.body)
507 errMsgE = errTree.find('.//Message')
508 logger.error(errMsgE.text)
510 rspec = EucaRSpecBuilder(cloud).toXML()
512 # Remove the instances records so next time they won't
514 if 'instances' in cloud:
515 del cloud['instances']
520 Hook called via 'sfi.py create'
522 def CreateSliver(api, slice_xrn, creds, xml, users, call_id):
523 if Callids().already_handled(call_id): return ""
526 logger = logging.getLogger('EucaAggregate')
527 logger.debug("In CreateSliver")
529 aggregate = Aggregate(api)
531 (hrn, type) = urn_to_hrn(slice_xrn)
532 peer = slices.get_peer(hrn)
533 sfa_peer = slices.get_sfa_peer(hrn)
536 slice_record = users[0].get('slice_record', {})
538 conn = getEucaConnection()
540 logger.error('Cannot create a connection to Eucalyptus')
544 schemaXML = ET.parse(EUCALYPTUS_RSPEC_SCHEMA)
545 rspecValidator = ET.RelaxNG(schemaXML)
546 rspecXML = ET.XML(xml)
547 for network in rspecXML.iterfind("./network"):
548 if network.get('name') != cloud['name']:
549 # Throw away everything except my own RSpec
550 # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
551 network.getparent().remove(network)
552 if not rspecValidator(rspecXML):
553 error = rspecValidator.error_log.last_error
554 message = '%s (line %s)' % (error.message, error.line)
555 raise InvalidRSpec(message)
558 Create the sliver[s] (slice) at this aggregate.
559 Verify HRN and initialize the slice record in PLC if necessary.
562 # ensure site record exists
563 site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
564 # ensure slice record exists
565 slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
566 # ensure person records exists
567 persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
569 # Get the slice from db or create one.
570 s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
572 s = Slice(slice_hrn = hrn)
574 # Process any changes in existing instance allocation
576 for sliceInst in s.instances:
577 pendingRmInst.append(sliceInst.instance_id)
578 existingInstGroup = rspecXML.findall(".//euca_instances")
579 for instGroup in existingInstGroup:
580 for existingInst in instGroup:
581 if existingInst.get('id') in pendingRmInst:
582 pendingRmInst.remove(existingInst.get('id'))
583 for inst in pendingRmInst:
584 dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
585 if dbInst.meta.state != 'deleted':
586 logger.debug('Instance %s will be terminated' % inst)
587 # Terminate instances one at a time for robustness
588 conn.terminate_instances([inst])
589 # Only change the state but do not remove the entry from the DB.
590 dbInst.meta.state = 'deleted'
591 #dbInst.destroySelf()
593 # Process new instance requests
594 requests = rspecXML.findall(".//request")
596 # Get all the public keys associate with slice.
600 logger.debug("Keys: %s" % user['keys'])
601 pubKeys = '\n'.join(keys)
602 logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
604 vmTypeElement = req.getparent()
605 instType = vmTypeElement.get('name')
606 numInst = int(req.find('instances').text)
608 bundleName = req.find('bundle').text
609 if not cloud['imageBundles'][bundleName]:
610 logger.error('Cannot find bundle %s' % bundleName)
611 bundleInfo = cloud['imageBundles'][bundleName]
612 instKernel = bundleInfo['kernelID']
613 instDiskImg = bundleInfo['imageID']
614 instRamDisk = bundleInfo['ramdiskID']
617 # Create the instances
618 for i in range(0, numInst):
619 eucaInst = EucaInstance(slice = s,
620 kernel_id = instKernel,
621 image_id = instDiskImg,
622 ramdisk_id = instRamDisk,
624 inst_type = instType,
625 meta = Meta(start_time=datetime.datetime.now()))
626 eucaInst.reserveInstance(conn, pubKeys)
628 # xxx - should return altered rspec
629 # with enough data for the client to understand what's happened
633 # Return information on the IP addresses bound to each slice's instances
635 def dumpInstanceInfo():
636 logger = logging.getLogger('EucaMeta')
637 outdir = "/var/www/html/euca/"
638 outfile = outdir + "instances.txt"
643 if e.errno != errno.EEXIST:
646 dbResults = Meta.select(
647 AND(Meta.q.pri_addr != None,
648 Meta.q.state == 'running')
650 dbResults = list(dbResults)
651 f = open(outfile, "w")
653 instId = r.instance.instance_id
655 hrn = r.instance.slice.slice_hrn
656 logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
657 f.write("%s %s %s\n" % (instId, ipaddr, hrn))
661 # A separate process that will update the meta data.
664 logger = logging.getLogger('EucaMeta')
665 fileHandler = logging.FileHandler('/var/log/euca_meta.log')
666 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
667 logger.addHandler(fileHandler)
668 fileHandler.setLevel(logging.DEBUG)
669 logger.setLevel(logging.DEBUG)
674 # Get IDs of the instances that don't have IPs yet.
675 dbResults = Meta.select(
676 AND(Meta.q.pri_addr == None,
677 Meta.q.state != 'deleted')
679 dbResults = list(dbResults)
680 logger.debug('[update process] dbResults: %s' % dbResults)
685 instids.append(r.instance.instance_id)
686 logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
688 # Get instance information from Eucalyptus
689 conn = getEucaConnection()
691 reservations = conn.get_all_instances(instids)
692 for reservation in reservations:
693 vmInstances += reservation.instances
696 instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
697 for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
698 logger.debug('[update process] IP dict: %s' % str(instIPs))
700 # Update the local DB
701 for ipData in instIPs:
702 dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
704 logger.info('[update process] Could not find %s in DB' % ipData['id'])
706 dbInst.meta.pri_addr = ipData['pri_addr']
707 dbInst.meta.pub_addr = ipData['pub_addr']
708 dbInst.meta.state = 'running'
714 request_rspec_versions = [dict(sfa_rspec_version)]
715 ad_rspec_versions = [dict(sfa_rspec_version)]
716 version_more = {'interface':'aggregate',
719 'request_rspec_versions': request_rspec_versions,
720 'ad_rspec_versions': ad_rspec_versions,
721 'default_ad_rspec': dict(sfa_rspec_version)
723 return version_core(version_more)
729 # #with open(sys.argv[1]) as xml:
730 # # theRSpec = xml.read()
731 # #CreateSliver(None, 'planetcloud.pc.test', theRSpec, 'call-id-cloudtest')
733 # #rspec = ListResources('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy', 'test_euca')
736 # server_key_file = '/var/lib/sfa/authorities/server.key'
737 # server_cert_file = '/var/lib/sfa/authorities/server.cert'
738 # api = PlcSfaApi(key_file = server_key_file, cert_file = server_cert_file, interface='aggregate')
739 # print getKeysForSlice(api, 'gc.gc.test1')
741 #if __name__ == "__main__":