Merge branch 'master' into eucalyptus-devel
[sfa.git] / sfa / managers / aggregate_manager_eucalyptus.py
1 from __future__ import with_statement 
2
3 import sys
4 import os
5 import logging
6 import datetime
7
8 import boto
9 from boto.ec2.regioninfo import RegionInfo
10 from boto.exception import EC2ResponseError
11 from ConfigParser import ConfigParser
12 from xmlbuilder import XMLBuilder
13 from lxml import etree as ET
14 from sqlobject import *
15
16 from sfa.util.faults import *
17 from sfa.util.xrn import urn_to_hrn, Xrn
18 from sfa.util.rspec import RSpec
19 from sfa.server.registry import Registries
20 from sfa.trust.credential import Credential
21 from sfa.plc.api import SfaAPI
22 from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn
23 from sfa.util.callids import Callids
24 from sfa.util.sfalogging import sfa_logger
25 from sfa.rspecs.sfa_rspec import sfa_rspec_version
26 from sfa.util.version import version_core
27
28 from threading import Thread
29 from time import sleep
30
31 ##
32 # The data structure used to represent a cloud.
33 # It contains the cloud name, its ip address, image information,
34 # key pairs, and clusters information.
35 #
36 cloud = {}
37
38 ##
39 # The location of the RelaxNG schema.
40 #
41 EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
42
43 api = SfaAPI()
44
45 ##
46 # Meta data of an instance.
47 #
48 class Meta(SQLObject):
49     instance   = SingleJoin('EucaInstance')
50     state      = StringCol(default = 'new')
51     pub_addr   = StringCol(default = None)
52     pri_addr   = StringCol(default = None)
53     start_time = DateTimeCol(default = None)
54
55 ##
56 # A representation of an Eucalyptus instance. This is a support class
57 # for instance <-> slice mapping.
58 #
59 class EucaInstance(SQLObject):
60     instance_id = StringCol(unique=True, default=None)
61     kernel_id   = StringCol()
62     image_id    = StringCol()
63     ramdisk_id  = StringCol()
64     inst_type   = StringCol()
65     key_pair    = StringCol()
66     slice       = ForeignKey('Slice')
67     meta        = ForeignKey('Meta')
68
69     ##
70     # Contacts Eucalyptus and tries to reserve this instance.
71     # 
72     # @param botoConn A connection to Eucalyptus.
73     # @param pubKeys A list of public keys for the instance.
74     #
75     def reserveInstance(self, botoConn, pubKeys):
76         logger = logging.getLogger('EucaAggregate')
77         logger.info('Reserving an instance: image: %s, kernel: ' \
78                     '%s, ramdisk: %s, type: %s, key: %s' % \
79                     (self.image_id, self.kernel_id, self.ramdisk_id,
80                     self.inst_type, self.key_pair))
81
82         # XXX The return statement is for testing. REMOVE in production
83         #return
84
85         try:
86             reservation = botoConn.run_instances(self.image_id,
87                                                  kernel_id = self.kernel_id,
88                                                  ramdisk_id = self.ramdisk_id,
89                                                  instance_type = self.inst_type,
90                                                  key_name  = self.key_pair,
91                                                  user_data = pubKeys)
92             for instance in reservation.instances:
93                 self.instance_id = instance.id
94
95         # If there is an error, destroy itself.
96         except EC2ResponseError, ec2RespErr:
97             errTree = ET.fromstring(ec2RespErr.body)
98             msg = errTree.find('.//Message')
99             logger.error(msg.text)
100             self.destroySelf()
101
102 ##
103 # A representation of a PlanetLab slice. This is a support class
104 # for instance <-> slice mapping.
105 #
106 class Slice(SQLObject):
107     slice_hrn = StringCol()
108     #slice_index = DatabaseIndex('slice_hrn')
109     instances = MultipleJoin('EucaInstance')
110
111 ##
112 # Initialize the aggregate manager by reading a configuration file.
113 #
114 def init_server():
115     logger = logging.getLogger('EucaAggregate')
116     fileHandler = logging.FileHandler('/var/log/euca.log')
117     fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
118     logger.addHandler(fileHandler)
119     logger.setLevel(logging.DEBUG)
120
121     configParser = ConfigParser()
122     configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
123     if len(configParser.sections()) < 1:
124         logger.error('No cloud defined in the config file')
125         raise Exception('Cannot find cloud definition in configuration file.')
126
127     # Only read the first section.
128     cloudSec = configParser.sections()[0]
129     cloud['name'] = cloudSec
130     cloud['access_key'] = configParser.get(cloudSec, 'access_key')
131     cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
132     cloud['cloud_url']  = configParser.get(cloudSec, 'cloud_url')
133     cloudURL = cloud['cloud_url']
134     if cloudURL.find('https://') >= 0:
135         cloudURL = cloudURL.replace('https://', '')
136     elif cloudURL.find('http://') >= 0:
137         cloudURL = cloudURL.replace('http://', '')
138     (cloud['ip'], parts) = cloudURL.split(':')
139
140     # Create image bundles
141     images = getEucaConnection().get_all_images()
142     cloud['images'] = images
143     cloud['imageBundles'] = {}
144     for i in images:
145         if i.type != 'machine' or i.kernel_id is None: continue
146         name = os.path.dirname(i.location)
147         detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
148         cloud['imageBundles'][name] = detail
149
150     # Initialize sqlite3 database and tables.
151     dbPath = '/etc/sfa/db'
152     dbName = 'euca_aggregate.db'
153
154     if not os.path.isdir(dbPath):
155         logger.info('%s not found. Creating directory ...' % dbPath)
156         os.mkdir(dbPath)
157
158     conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
159     sqlhub.processConnection = conn
160     Slice.createTable(ifNotExists=True)
161     EucaInstance.createTable(ifNotExists=True)
162     IP.createTable(ifNotExists=True)
163
164     # Start the update thread to keep track of the meta data
165     # about Eucalyptus instance.
166     Thread(target=updateMeta).start()
167
168     # Make sure the schema exists.
169     if not os.path.exists(EUCALYPTUS_RSPEC_SCHEMA):
170         err = 'Cannot location schema at %s' % EUCALYPTUS_RSPEC_SCHEMA
171         logger.error(err)
172         raise Exception(err)
173
174 ##
175 # Creates a connection to Eucalytpus. This function is inspired by 
176 # the make_connection() in Euca2ools.
177 #
178 # @return A connection object or None
179 #
180 def getEucaConnection():
181     global cloud
182     accessKey = cloud['access_key']
183     secretKey = cloud['secret_key']
184     eucaURL   = cloud['cloud_url']
185     useSSL    = False
186     srvPath   = '/'
187     eucaPort  = 8773
188     logger    = logging.getLogger('EucaAggregate')
189
190     if not accessKey or not secretKey or not eucaURL:
191         logger.error('Please set ALL of the required environment ' \
192                      'variables by sourcing the eucarc file.')
193         return None
194     
195     # Split the url into parts
196     if eucaURL.find('https://') >= 0:
197         useSSL  = True
198         eucaURL = eucaURL.replace('https://', '')
199     elif eucaURL.find('http://') >= 0:
200         useSSL  = False
201         eucaURL = eucaURL.replace('http://', '')
202     (eucaHost, parts) = eucaURL.split(':')
203     if len(parts) > 1:
204         parts = parts.split('/')
205         eucaPort = int(parts[0])
206         parts = parts[1:]
207         srvPath = '/'.join(parts)
208
209     return boto.connect_ec2(aws_access_key_id=accessKey,
210                             aws_secret_access_key=secretKey,
211                             is_secure=useSSL,
212                             region=RegionInfo(None, 'eucalyptus', eucaHost), 
213                             port=eucaPort,
214                             path=srvPath)
215
216 ##
217 # Returns a string of keys that belong to the users of the given slice.
218 # @param sliceHRN The hunman readable name of the slice.
219 # @return sting()
220 #
221 def getKeysForSlice(api, sliceHRN):
222     logger   = logging.getLogger('EucaAggregate')
223     cred     = api.getCredential()
224     registry = api.registries[api.hrn]
225     keys     = []
226
227     # Get the slice record
228     records = registry.Resolve(sliceHRN, cred)
229     if not records:
230         logging.warn('Cannot find any record for slice %s' % sliceHRN)
231         return []
232
233     # Find who can log into this slice
234     persons = records[0]['persons']
235
236     # Extract the keys from persons records
237     for p in persons:
238         sliceUser = registry.Resolve(p, cred)
239         userKeys = sliceUser[0]['keys']
240         keys += userKeys
241
242     return ''.join(keys)
243
244 ##
245 # A class that builds the RSpec for Eucalyptus.
246 #
247 class EucaRSpecBuilder(object):
248     ##
249     # Initizes a RSpec builder
250     #
251     # @param cloud A dictionary containing data about a 
252     #              cloud (ex. clusters, ip)
253     def __init__(self, cloud):
254         self.eucaRSpec = XMLBuilder(format = True, tab_step = "  ")
255         self.cloudInfo = cloud
256
257     ##
258     # Creates a request stanza.
259     # 
260     # @param num The number of instances to create.
261     # @param image The disk image id.
262     # @param kernel The kernel image id.
263     # @param keypair Key pair to embed.
264     # @param ramdisk Ramdisk id (optional).
265     #
266     def __requestXML(self, num, image, kernel, keypair, ramdisk = ''):
267         xml = self.eucaRSpec
268         with xml.request:
269             with xml.instances:
270                 xml << str(num)
271             with xml.kernel_image(id=kernel):
272                 xml << ''
273             if ramdisk == '':
274                 with xml.ramdisk:
275                     xml << ''
276             else:
277                 with xml.ramdisk(id=ramdisk):
278                     xml << ''
279             with xml.disk_image(id=image):
280                 xml << ''
281             with xml.keypair:
282                 xml << keypair
283
284     ##
285     # Creates the cluster stanza.
286     #
287     # @param clusters Clusters information.
288     #
289     def __clustersXML(self, clusters):
290         cloud = self.cloudInfo
291         xml = self.eucaRSpec
292
293         for cluster in clusters:
294             instances = cluster['instances']
295             with xml.cluster(id=cluster['name']):
296                 with xml.ipv4:
297                     xml << cluster['ip']
298                 with xml.vm_types:
299                     for inst in instances:
300                         with xml.vm_type(name=inst[0]):
301                             with xml.free_slots:
302                                 xml << str(inst[1])
303                             with xml.max_instances:
304                                 xml << str(inst[2])
305                             with xml.cores:
306                                 xml << str(inst[3])
307                             with xml.memory(unit='MB'):
308                                 xml << str(inst[4])
309                             with xml.disk_space(unit='GB'):
310                                 xml << str(inst[5])
311                             if 'instances' in cloud and inst[0] in cloud['instances']:
312                                 existingEucaInstances = cloud['instances'][inst[0]]
313                                 with xml.euca_instances:
314                                     for eucaInst in existingEucaInstances:
315                                         with xml.euca_instance(id=eucaInst['id']):
316                                             with xml.state:
317                                                 xml << eucaInst['state']
318                                             with xml.public_dns:
319                                                 xml << eucaInst['public_dns']
320
321     def __imageBundleXML(self, bundles):
322         xml = self.eucaRSpec
323         with xml.bundles:
324             for bundle in bundles.keys():
325                 with xml.bundle(id=bundle):
326                     xml << ''
327
328     ##
329     # Creates the Images stanza.
330     #
331     # @param images A list of images in Eucalyptus.
332     #
333     def __imagesXML(self, images):
334         xml = self.eucaRSpec
335         with xml.images:
336             for image in images:
337                 with xml.image(id=image.id):
338                     with xml.type:
339                         xml << image.type
340                     with xml.arch:
341                         xml << image.architecture
342                     with xml.state:
343                         xml << image.state
344                     with xml.location:
345                         xml << image.location
346
347     ##
348     # Creates the KeyPairs stanza.
349     #
350     # @param keypairs A list of key pairs in Eucalyptus.
351     #
352     def __keyPairsXML(self, keypairs):
353         xml = self.eucaRSpec
354         with xml.keypairs:
355             for key in keypairs:
356                 with xml.keypair:
357                     xml << key.name
358
359     ##
360     # Generates the RSpec.
361     #
362     def toXML(self):
363         logger = logging.getLogger('EucaAggregate')
364         if not self.cloudInfo:
365             logger.error('No cloud information')
366             return ''
367
368         xml = self.eucaRSpec
369         cloud = self.cloudInfo
370         with xml.RSpec(type='eucalyptus'):
371             with xml.network(id=cloud['name']):
372                 with xml.ipv4:
373                     xml << cloud['ip']
374                 #self.__keyPairsXML(cloud['keypairs'])
375                 #self.__imagesXML(cloud['images'])
376                 self.__imageBundleXML(cloud['imageBundles'])
377                 self.__clustersXML(cloud['clusters'])
378         return str(xml)
379
380 ##
381 # A parser to parse the output of availability-zones.
382 #
383 # Note: Only one cluster is supported. If more than one, this will
384 #       not work.
385 #
386 class ZoneResultParser(object):
387     def __init__(self, zones):
388         self.zones = zones
389
390     def parse(self):
391         if len(self.zones) < 3:
392             return
393         clusterList = []
394         cluster = {} 
395         instList = []
396
397         cluster['name'] = self.zones[0].name
398         cluster['ip']   = self.zones[0].state
399
400         for i in range(2, len(self.zones)):
401             currZone = self.zones[i]
402             instType = currZone.name.split()[1]
403
404             stateString = currZone.state.split('/')
405             rscString   = stateString[1].split()
406
407             instFree      = int(stateString[0])
408             instMax       = int(rscString[0])
409             instNumCpu    = int(rscString[1])
410             instRam       = int(rscString[2])
411             instDiskSpace = int(rscString[3])
412
413             instTuple = (instType, instFree, instMax, instNumCpu, instRam, instDiskSpace)
414             instList.append(instTuple)
415         cluster['instances'] = instList
416         clusterList.append(cluster)
417
418         return clusterList
419
420 def ListResources(api, creds, options, call_id): 
421     if Callids().already_handled(call_id): return ""
422     global cloud
423     # get slice's hrn from options
424     xrn = options.get('geni_slice_urn', '')
425     hrn, type = urn_to_hrn(xrn)
426     logger = logging.getLogger('EucaAggregate')
427
428     # get hrn of the original caller
429     origin_hrn = options.get('origin_hrn', None)
430     if not origin_hrn:
431         origin_hrn = Credential(string=creds).get_gid_caller().get_hrn()
432         # origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
433
434     conn = getEucaConnection()
435
436     if not conn:
437         logger.error('Cannot create a connection to Eucalyptus')
438         return 'Cannot create a connection to Eucalyptus'
439
440     try:
441         # Zones
442         zones = conn.get_all_zones(['verbose'])
443         p = ZoneResultParser(zones)
444         clusters = p.parse()
445         cloud['clusters'] = clusters
446         
447         # Images
448         images = conn.get_all_images()
449         cloud['images'] = images
450         cloud['imageBundles'] = {}
451         for i in images:
452             if i.type != 'machine' or i.kernel_id is None: continue
453             name = os.path.dirname(i.location)
454             detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
455             cloud['imageBundles'][name] = detail
456
457         # Key Pairs
458         keyPairs = conn.get_all_key_pairs()
459         cloud['keypairs'] = keyPairs
460
461         if hrn:
462             instanceId = []
463             instances  = []
464
465             # Get the instances that belong to the given slice from sqlite3
466             # XXX use getOne() in production because the slice's hrn is supposed
467             # to be unique. For testing, uniqueness is turned off in the db.
468             # If the slice isn't found in the database, create a record for the 
469             # slice.
470             matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
471             if matchedSlices:
472                 theSlice = matchedSlices[-1]
473             else:
474                 theSlice = Slice(slice_hrn = hrn)
475             for instance in theSlice.instances:
476                 instanceId.append(instance.instance_id)
477
478             # Get the information about those instances using their ids.
479             if len(instanceId) > 0:
480                 reservations = conn.get_all_instances(instanceId)
481             else:
482                 reservations = []
483             for reservation in reservations:
484                 for instance in reservation.instances:
485                     instances.append(instance)
486
487             # Construct a dictionary for the EucaRSpecBuilder
488             instancesDict = {}
489             for instance in instances:
490                 instList = instancesDict.setdefault(instance.instance_type, [])
491                 instInfoDict = {} 
492
493                 instInfoDict['id'] = instance.id
494                 instInfoDict['public_dns'] = instance.public_dns_name
495                 instInfoDict['state'] = instance.state
496                 instInfoDict['key'] = instance.key_name
497
498                 instList.append(instInfoDict)
499             cloud['instances'] = instancesDict
500
501     except EC2ResponseError, ec2RespErr:
502         errTree = ET.fromstring(ec2RespErr.body)
503         errMsgE = errTree.find('.//Message')
504         logger.error(errMsgE.text)
505
506     rspec = EucaRSpecBuilder(cloud).toXML()
507
508     # Remove the instances records so next time they won't 
509     # show up.
510     if 'instances' in cloud:
511         del cloud['instances']
512
513     return rspec
514
515 """
516 Hook called via 'sfi.py create'
517 """
518 def CreateSliver(api, xrn, creds, xml, users, call_id):
519     if Callids().already_handled(call_id): return ""
520
521     global cloud
522     hrn = urn_to_hrn(xrn)[0]
523     logger = logging.getLogger('EucaAggregate')
524
525     conn = getEucaConnection()
526     if not conn:
527         logger.error('Cannot create a connection to Eucalyptus')
528         return ""
529
530     # Validate RSpec
531     schemaXML = ET.parse(EUCALYPTUS_RSPEC_SCHEMA)
532     rspecValidator = ET.RelaxNG(schemaXML)
533     rspecXML = ET.XML(xml)
534     for network in rspecXML.iterfind("./network"):
535         if network.get('id') != cloud['name']:
536             # Throw away everything except my own RSpec
537             # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
538             network.getparent().remove(network)
539     if not rspecValidator(rspecXML):
540         error = rspecValidator.error_log.last_error
541         message = '%s (line %s)' % (error.message, error.line) 
542         # XXX: InvalidRSpec is new. Currently, I am not working with Trunk code.
543         #raise InvalidRSpec(message)
544         raise Exception(message)
545
546     # Get the slice from db or create one.
547     s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
548     if s is None:
549         s = Slice(slice_hrn = hrn)
550
551     # Process any changes in existing instance allocation
552     pendingRmInst = []
553     for sliceInst in s.instances:
554         pendingRmInst.append(sliceInst.instance_id)
555     existingInstGroup = rspecXML.findall(".//euca_instances")
556     for instGroup in existingInstGroup:
557         for existingInst in instGroup:
558             if existingInst.get('id') in pendingRmInst:
559                 pendingRmInst.remove(existingInst.get('id'))
560     for inst in pendingRmInst:
561         logger.debug('Instance %s will be terminated' % inst)
562         dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
563         # Only change the state but do not remove the entry from the DB.
564         dbInst.meta.state = 'deleted'
565         #dbInst.destroySelf()
566     conn.terminate_instances(pendingRmInst)
567
568     # Process new instance requests
569     requests = rspecXML.findall(".//request")
570     if requests:
571         # Get all the public keys associate with slice.
572         pubKeys = getKeysForSlice(api, s.slice_hrn)
573         logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
574     for req in requests:
575         vmTypeElement = req.getparent()
576         instType = vmTypeElement.get('name')
577         numInst  = int(req.find('instances').text)
578         
579         bundleName = req.find('bundle').text
580         if not cloud['imageBundles'][bundleName]:
581             logger.error('Cannot find bundle %s' % bundleName)
582         bundleInfo = cloud['imageBundles'][bundleName]
583         instKernel  = bundleInfo['kernelID']
584         instDiskImg = bundleInfo['imageID']
585         instRamDisk = bundleInfo['ramdiskID']
586         instKey     = None
587
588         # Create the instances
589         for i in range(0, numInst):
590             eucaInst = EucaInstance(slice      = s,
591                                     kernel_id  = instKernel,
592                                     image_id   = instDiskImg,
593                                     ramdisk_id = instRamDisk,
594                                     key_pair   = instKey,
595                                     inst_type  = instType,
596                                     meta       = Meta(start_time=datetime.datetime.now()))
597             eucaInst.reserveInstance(conn, pubKeys)
598
599     # xxx - should return altered rspec 
600     # with enough data for the client to understand what's happened
601     return xml
602
603 ##
604 # A thread that will update the meta data.
605 #
606 def updateMeta():
607     logger = logging.getLogger('EucaAggregate')
608     while True:
609         sleep(120)
610
611         # Get IDs of the instances that don't have IPs yet.
612         dbResults = Meta.select(
613                       AND(Meta.q.pri_addr == None,
614                           Meta.q.state    != 'deleted')
615                     )
616         dbResults = list(dbResults)
617         logger.debug('[update thread] dbResults: %s' % dbResults)
618         instids = []
619         for r in dbResults:
620             instids.append(r.instance.instance_id)
621         logger.debug('[update thread] Instance Id: %s' % ', '.join(instids))
622
623         # Get instance information from Eucalyptus
624         conn = getEucaConnection()
625         vmInstances = []
626         reservations = conn.get_all_instances(instids)
627         for reservation in reservations:
628             vmInstances += reservation.instances
629
630         # Check the IPs
631         instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
632                     for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
633         logger.debug('[update thread] IP dict: %s' % str(instIPs))
634
635         # Update the local DB
636         for ipData in instIPs:
637             dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
638             if not dbInst:
639                 logger.info('[update thread] Could not find %s in DB' % ipData['id'])
640                 continue
641             dbInst.meta.pri_addr = ipData['pri_addr']
642             dbInst.meta.pub_addr = ipData['pub_addr']
643             dbInst.meta.state    = 'running'
644
645 def GetVersion(api):
646     xrn=Xrn(api.hrn)
647     request_rspec_versions = [dict(sfa_rspec_version)]
648     ad_rspec_versions = [dict(sfa_rspec_version)]
649     version_more = {'interface':'aggregate',
650                     'testbed':'myplc',
651                     'hrn':xrn.get_hrn(),
652                     'request_rspec_versions': request_rspec_versions,
653                     'ad_rspec_versions': ad_rspec_versions,
654                     'default_ad_rspec': dict(sfa_rspec_version)
655                     }
656     return version_core(version_more)
657
658 def main():
659     init_server()
660
661     #theRSpec = None
662     #with open(sys.argv[1]) as xml:
663     #    theRSpec = xml.read()
664     #CreateSliver(None, 'planetcloud.pc.test', theRSpec, 'call-id-cloudtest')
665
666     #rspec = ListResources('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy', 'test_euca')
667     #print rspec
668
669     server_key_file = '/var/lib/sfa/authorities/server.key'
670     server_cert_file = '/var/lib/sfa/authorities/server.cert'
671     api = SfaAPI(key_file = server_key_file, cert_file = server_cert_file, interface='aggregate')
672     print getKeysForSlice(api, 'gc.gc.test1')
673
674 if __name__ == "__main__":
675     main()
676