1 from __future__ import with_statement
7 from multiprocessing import Process
11 from boto.ec2.regioninfo import RegionInfo
12 from boto.exception import EC2ResponseError
13 from ConfigParser import ConfigParser
14 from xmlbuilder import XMLBuilder
15 from lxml import etree as ET
16 from sqlobject import *
18 from sfa.util.faults import InvalidRSpec,
19 from sfa.util.xrn import urn_to_hrn, Xrn
20 from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn
21 from sfa.util.callids import Callids
22 #comes with its own logging
23 #from sfa.util.sfalogging import logger
24 from sfa.util.version import version_core
26 from sfa.trust.credential import Credential
28 from sfa.server.sfaapi import SfaApi
30 from sfa.plc.aggregate import Aggregate
31 from sfa.plc.slices import Slice, Slices
32 # not sure what this used to be nor where it is now defined
33 #from sfa.rspecs.sfa_rspec import sfa_rspec_version
36 # Meta data of an instance.
38 class Meta(SQLObject):
39 instance = SingleJoin('EucaInstance')
40 state = StringCol(default = 'new')
41 pub_addr = StringCol(default = None)
42 pri_addr = StringCol(default = None)
43 start_time = DateTimeCol(default = None)
46 # A representation of an Eucalyptus instance. This is a support class
47 # for instance <-> slice mapping.
49 class EucaInstance(SQLObject):
50 instance_id = StringCol(unique=True, default=None)
51 kernel_id = StringCol()
52 image_id = StringCol()
53 ramdisk_id = StringCol()
54 inst_type = StringCol()
55 key_pair = StringCol()
56 slice = ForeignKey('Slice')
57 meta = ForeignKey('Meta')
60 # Contacts Eucalyptus and tries to reserve this instance.
62 # @param botoConn A connection to Eucalyptus.
63 # @param pubKeys A list of public keys for the instance.
65 def reserveInstance(self, botoConn, pubKeys):
66 logger = logging.getLogger('EucaAggregate')
67 logger.info('Reserving an instance: image: %s, kernel: ' \
68 '%s, ramdisk: %s, type: %s, key: %s' % \
69 (self.image_id, self.kernel_id, self.ramdisk_id,
70 self.inst_type, self.key_pair))
73 reservation = botoConn.run_instances(self.image_id,
74 kernel_id = self.kernel_id,
75 ramdisk_id = self.ramdisk_id,
76 instance_type = self.inst_type,
77 key_name = self.key_pair,
79 for instance in reservation.instances:
80 self.instance_id = instance.id
82 # If there is an error, destroy itself.
83 except EC2ResponseError, ec2RespErr:
84 errTree = ET.fromstring(ec2RespErr.body)
85 msg = errTree.find('.//Message')
86 logger.error(msg.text)
90 # A representation of a PlanetLab slice. This is a support class
91 # for instance <-> slice mapping.
93 class Slice(SQLObject):
94 slice_hrn = StringCol()
95 #slice_index = DatabaseIndex('slice_hrn')
96 instances = MultipleJoin('EucaInstance')
99 # A class that builds the RSpec for Eucalyptus.
101 class EucaRSpecBuilder(object):
103 # Initizes a RSpec builder
105 # @param cloud A dictionary containing data about a
106 # cloud (ex. clusters, ip)
107 def __init__(self, cloud):
108 self.eucaRSpec = XMLBuilder(format = True, tab_step = " ")
109 self.cloudInfo = cloud
112 # Creates a request stanza.
114 # @param num The number of instances to create.
115 # @param image The disk image id.
116 # @param kernel The kernel image id.
117 # @param keypair Key pair to embed.
118 # @param ramdisk Ramdisk id (optional).
120 def __requestXML(self, num, image, kernel, keypair, ramdisk = ''):
125 with xml.kernel_image(id=kernel):
131 with xml.ramdisk(id=ramdisk):
133 with xml.disk_image(id=image):
139 # Creates the cluster stanza.
141 # @param clusters Clusters information.
143 def __clustersXML(self, clusters):
144 cloud = self.cloudInfo
147 for cluster in clusters:
148 instances = cluster['instances']
149 with xml.cluster(id=cluster['name']):
153 for inst in instances:
154 with xml.vm_type(name=inst[0]):
157 with xml.max_instances:
161 with xml.memory(unit='MB'):
163 with xml.disk_space(unit='GB'):
165 if 'instances' in cloud and inst[0] in cloud['instances']:
166 existingEucaInstances = cloud['instances'][inst[0]]
167 with xml.euca_instances:
168 for eucaInst in existingEucaInstances:
169 with xml.euca_instance(id=eucaInst['id']):
171 xml << eucaInst['state']
173 xml << eucaInst['public_dns']
175 def __imageBundleXML(self, bundles):
178 for bundle in bundles.keys():
179 with xml.bundle(id=bundle):
183 # Creates the Images stanza.
185 # @param images A list of images in Eucalyptus.
187 def __imagesXML(self, images):
191 with xml.image(id=image.id):
195 xml << image.architecture
199 xml << image.location
202 # Creates the KeyPairs stanza.
204 # @param keypairs A list of key pairs in Eucalyptus.
206 def __keyPairsXML(self, keypairs):
214 # Generates the RSpec.
217 logger = logging.getLogger('EucaAggregate')
218 if not self.cloudInfo:
219 logger.error('No cloud information')
223 cloud = self.cloudInfo
224 with xml.RSpec(type='eucalyptus'):
225 with xml.network(name=cloud['name']):
228 #self.__keyPairsXML(cloud['keypairs'])
229 #self.__imagesXML(cloud['images'])
230 self.__imageBundleXML(cloud['imageBundles'])
231 self.__clustersXML(cloud['clusters'])
235 # A parser to parse the output of availability-zones.
237 # Note: Only one cluster is supported. If more than one, this will
240 class ZoneResultParser(object):
241 def __init__(self, zones):
245 if len(self.zones) < 3:
251 cluster['name'] = self.zones[0].name
252 cluster['ip'] = self.zones[0].state
254 for i in range(2, len(self.zones)):
255 currZone = self.zones[i]
256 instType = currZone.name.split()[1]
258 stateString = currZone.state.split('/')
259 rscString = stateString[1].split()
261 instFree = int(stateString[0])
262 instMax = int(rscString[0])
263 instNumCpu = int(rscString[1])
264 instRam = int(rscString[2])
265 instDiskSpace = int(rscString[3])
267 instTuple = (instType, instFree, instMax, instNumCpu, instRam, instDiskSpace)
268 instList.append(instTuple)
269 cluster['instances'] = instList
270 clusterList.append(cluster)
274 class AggregateManagerEucalyptus:
276 # The data structure used to represent a cloud.
277 # It contains the cloud name, its ip address, image information,
278 # key pairs, and clusters information.
281 # The location of the RelaxNG schema.
282 EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
286 # the init_server mechanism has vanished
288 if AggregateManagerEucalyptus._inited: return
289 AggregateManagerEucalyptus.init_server()
291 # Initialize the aggregate manager by reading a configuration file.
294 logger = logging.getLogger('EucaAggregate')
295 fileHandler = logging.FileHandler('/var/log/euca.log')
296 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
297 logger.addHandler(fileHandler)
298 fileHandler.setLevel(logging.DEBUG)
299 logger.setLevel(logging.DEBUG)
301 configParser = ConfigParser()
302 configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
303 if len(configParser.sections()) < 1:
304 logger.error('No cloud defined in the config file')
305 raise Exception('Cannot find cloud definition in configuration file.')
307 # Only read the first section.
308 cloudSec = configParser.sections()[0]
309 AggregateManagerEucalyptus.cloud['name'] = cloudSec
310 AggregateManagerEucalyptus.cloud['access_key'] = configParser.get(cloudSec, 'access_key')
311 AggregateManagerEucalyptus.cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
312 AggregateManagerEucalyptus.cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url')
313 cloudURL = AggregateManagerEucalyptus.cloud['cloud_url']
314 if cloudURL.find('https://') >= 0:
315 cloudURL = cloudURL.replace('https://', '')
316 elif cloudURL.find('http://') >= 0:
317 cloudURL = cloudURL.replace('http://', '')
318 (AggregateManagerEucalyptus.cloud['ip'], parts) = cloudURL.split(':')
320 # Create image bundles
321 images = self.getEucaConnection().get_all_images()
322 AggregateManagerEucalyptus.cloud['images'] = images
323 AggregateManagerEucalyptus.cloud['imageBundles'] = {}
325 if i.type != 'machine' or i.kernel_id is None: continue
326 name = os.path.dirname(i.location)
327 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
328 AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
330 # Initialize sqlite3 database and tables.
331 dbPath = '/etc/sfa/db'
332 dbName = 'euca_aggregate.db'
334 if not os.path.isdir(dbPath):
335 logger.info('%s not found. Creating directory ...' % dbPath)
338 conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
339 sqlhub.processConnection = conn
340 Slice.createTable(ifNotExists=True)
341 EucaInstance.createTable(ifNotExists=True)
342 Meta.createTable(ifNotExists=True)
344 # Start the update process to keep track of the meta data
345 # about Eucalyptus instance.
346 Process(target=AggregateManagerEucalyptus.updateMeta).start()
348 # Make sure the schema exists.
349 if not os.path.exists(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA):
350 err = 'Cannot location schema at %s' % AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA
355 # A separate process that will update the meta data.
359 logger = logging.getLogger('EucaMeta')
360 fileHandler = logging.FileHandler('/var/log/euca_meta.log')
361 fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
362 logger.addHandler(fileHandler)
363 fileHandler.setLevel(logging.DEBUG)
364 logger.setLevel(logging.DEBUG)
369 # Get IDs of the instances that don't have IPs yet.
370 dbResults = Meta.select(
371 AND(Meta.q.pri_addr == None,
372 Meta.q.state != 'deleted')
374 dbResults = list(dbResults)
375 logger.debug('[update process] dbResults: %s' % dbResults)
380 instids.append(r.instance.instance_id)
381 logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
383 # Get instance information from Eucalyptus
384 conn = self.getEucaConnection()
386 reservations = conn.get_all_instances(instids)
387 for reservation in reservations:
388 vmInstances += reservation.instances
391 instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
392 for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
393 logger.debug('[update process] IP dict: %s' % str(instIPs))
395 # Update the local DB
396 for ipData in instIPs:
397 dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
399 logger.info('[update process] Could not find %s in DB' % ipData['id'])
401 dbInst.meta.pri_addr = ipData['pri_addr']
402 dbInst.meta.pub_addr = ipData['pub_addr']
403 dbInst.meta.state = 'running'
405 self.dumpinstanceInfo()
408 # Creates a connection to Eucalytpus. This function is inspired by
409 # the make_connection() in Euca2ools.
411 # @return A connection object or None
413 def getEucaConnection():
414 accessKey = AggregateManagerEucalyptus.cloud['access_key']
415 secretKey = AggregateManagerEucalyptus.cloud['secret_key']
416 eucaURL = AggregateManagerEucalyptus.cloud['cloud_url']
420 logger = logging.getLogger('EucaAggregate')
422 if not accessKey or not secretKey or not eucaURL:
423 logger.error('Please set ALL of the required environment ' \
424 'variables by sourcing the eucarc file.')
427 # Split the url into parts
428 if eucaURL.find('https://') >= 0:
430 eucaURL = eucaURL.replace('https://', '')
431 elif eucaURL.find('http://') >= 0:
433 eucaURL = eucaURL.replace('http://', '')
434 (eucaHost, parts) = eucaURL.split(':')
436 parts = parts.split('/')
437 eucaPort = int(parts[0])
439 srvPath = '/'.join(parts)
441 return boto.connect_ec2(aws_access_key_id=accessKey,
442 aws_secret_access_key=secretKey,
444 region=RegionInfo(None, 'eucalyptus', eucaHost),
448 def ListResources(api, creds, options, call_id):
449 if Callids().already_handled(call_id): return ""
450 # get slice's hrn from options
451 xrn = options.get('geni_slice_urn', '')
452 hrn, type = urn_to_hrn(xrn)
453 logger = logging.getLogger('EucaAggregate')
455 # get hrn of the original caller
456 origin_hrn = options.get('origin_hrn', None)
458 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
460 conn = self.getEucaConnection()
463 logger.error('Cannot create a connection to Eucalyptus')
464 return 'Cannot create a connection to Eucalyptus'
468 zones = conn.get_all_zones(['verbose'])
469 p = ZoneResultParser(zones)
471 AggregateManagerEucalyptus.cloud['clusters'] = clusters
474 images = conn.get_all_images()
475 AggregateManagerEucalyptus.cloud['images'] = images
476 AggregateManagerEucalyptus.cloud['imageBundles'] = {}
478 if i.type != 'machine' or i.kernel_id is None: continue
479 name = os.path.dirname(i.location)
480 detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
481 AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
484 keyPairs = conn.get_all_key_pairs()
485 AggregateManagerEucalyptus.cloud['keypairs'] = keyPairs
491 # Get the instances that belong to the given slice from sqlite3
492 # XXX use getOne() in production because the slice's hrn is supposed
493 # to be unique. For testing, uniqueness is turned off in the db.
494 # If the slice isn't found in the database, create a record for the
496 matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
498 theSlice = matchedSlices[-1]
500 theSlice = Slice(slice_hrn = hrn)
501 for instance in theSlice.instances:
502 instanceId.append(instance.instance_id)
504 # Get the information about those instances using their ids.
505 if len(instanceId) > 0:
506 reservations = conn.get_all_instances(instanceId)
509 for reservation in reservations:
510 for instance in reservation.instances:
511 instances.append(instance)
513 # Construct a dictionary for the EucaRSpecBuilder
515 for instance in instances:
516 instList = instancesDict.setdefault(instance.instance_type, [])
519 instInfoDict['id'] = instance.id
520 instInfoDict['public_dns'] = instance.public_dns_name
521 instInfoDict['state'] = instance.state
522 instInfoDict['key'] = instance.key_name
524 instList.append(instInfoDict)
525 AggregateManagerEucalyptus.cloud['instances'] = instancesDict
527 except EC2ResponseError, ec2RespErr:
528 errTree = ET.fromstring(ec2RespErr.body)
529 errMsgE = errTree.find('.//Message')
530 logger.error(errMsgE.text)
532 rspec = EucaRSpecBuilder(AggregateManagerEucalyptus.cloud).toXML()
534 # Remove the instances records so next time they won't
536 if 'instances' in AggregateManagerEucalyptus.cloud:
537 del AggregateManagerEucalyptus.cloud['instances']
542 Hook called via 'sfi.py create'
544 def CreateSliver(api, slice_xrn, creds, xml, users, call_id):
545 if Callids().already_handled(call_id): return ""
547 logger = logging.getLogger('EucaAggregate')
548 logger.debug("In CreateSliver")
550 aggregate = Aggregate(api)
552 (hrn, type) = urn_to_hrn(slice_xrn)
553 peer = slices.get_peer(hrn)
554 sfa_peer = slices.get_sfa_peer(hrn)
557 slice_record = users[0].get('slice_record', {})
559 conn = self.getEucaConnection()
561 logger.error('Cannot create a connection to Eucalyptus')
565 schemaXML = ET.parse(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA)
566 rspecValidator = ET.RelaxNG(schemaXML)
567 rspecXML = ET.XML(xml)
568 for network in rspecXML.iterfind("./network"):
569 if network.get('name') != AggregateManagerEucalyptus.cloud['name']:
570 # Throw away everything except my own RSpec
571 # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
572 network.getparent().remove(network)
573 if not rspecValidator(rspecXML):
574 error = rspecValidator.error_log.last_error
575 message = '%s (line %s)' % (error.message, error.line)
576 raise InvalidRSpec(message)
579 Create the sliver[s] (slice) at this aggregate.
580 Verify HRN and initialize the slice record in PLC if necessary.
583 # ensure site record exists
584 site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
585 # ensure slice record exists
586 slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
587 # ensure person records exists
588 persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
590 # Get the slice from db or create one.
591 s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
593 s = Slice(slice_hrn = hrn)
595 # Process any changes in existing instance allocation
597 for sliceInst in s.instances:
598 pendingRmInst.append(sliceInst.instance_id)
599 existingInstGroup = rspecXML.findall(".//euca_instances")
600 for instGroup in existingInstGroup:
601 for existingInst in instGroup:
602 if existingInst.get('id') in pendingRmInst:
603 pendingRmInst.remove(existingInst.get('id'))
604 for inst in pendingRmInst:
605 dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
606 if dbInst.meta.state != 'deleted':
607 logger.debug('Instance %s will be terminated' % inst)
608 # Terminate instances one at a time for robustness
609 conn.terminate_instances([inst])
610 # Only change the state but do not remove the entry from the DB.
611 dbInst.meta.state = 'deleted'
612 #dbInst.destroySelf()
614 # Process new instance requests
615 requests = rspecXML.findall(".//request")
617 # Get all the public keys associate with slice.
621 logger.debug("Keys: %s" % user['keys'])
622 pubKeys = '\n'.join(keys)
623 logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
625 vmTypeElement = req.getparent()
626 instType = vmTypeElement.get('name')
627 numInst = int(req.find('instances').text)
629 bundleName = req.find('bundle').text
630 if not AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]:
631 logger.error('Cannot find bundle %s' % bundleName)
632 bundleInfo = AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]
633 instKernel = bundleInfo['kernelID']
634 instDiskImg = bundleInfo['imageID']
635 instRamDisk = bundleInfo['ramdiskID']
638 # Create the instances
639 for i in range(0, numInst):
640 eucaInst = EucaInstance(slice = s,
641 kernel_id = instKernel,
642 image_id = instDiskImg,
643 ramdisk_id = instRamDisk,
645 inst_type = instType,
646 meta = Meta(start_time=datetime.datetime.now()))
647 eucaInst.reserveInstance(conn, pubKeys)
649 # xxx - should return altered rspec
650 # with enough data for the client to understand what's happened
654 # Return information on the IP addresses bound to each slice's instances
656 def dumpInstanceInfo():
657 logger = logging.getLogger('EucaMeta')
658 outdir = "/var/www/html/euca/"
659 outfile = outdir + "instances.txt"
664 if e.errno != errno.EEXIST:
667 dbResults = Meta.select(
668 AND(Meta.q.pri_addr != None,
669 Meta.q.state == 'running')
671 dbResults = list(dbResults)
672 f = open(outfile, "w")
674 instId = r.instance.instance_id
676 hrn = r.instance.slice.slice_hrn
677 logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
678 f.write("%s %s %s\n" % (instId, ipaddr, hrn))
683 request_rspec_versions = [dict(sfa_rspec_version)]
684 ad_rspec_versions = [dict(sfa_rspec_version)]
685 version_more = {'interface':'aggregate',
688 'request_rspec_versions': request_rspec_versions,
689 'ad_rspec_versions': ad_rspec_versions,
690 'default_ad_rspec': dict(sfa_rspec_version)
692 return version_core(version_more)