1 #----------------------------------------------------------------------
2 # Copyright (c) 2008 Board of Trustees, Princeton University
4 # Permission is hereby granted, free of charge, to any person obtaining
5 # a copy of this software and/or hardware specification (the "Work") to
6 # deal in the Work without restriction, including without limitation the
7 # rights to use, copy, modify, merge, publish, distribute, sublicense,
8 # and/or sell copies of the Work, and to permit persons to whom the Work
9 # is furnished to do so, subject to the following conditions:
11 # The above copyright notice and this permission notice shall be
12 # included in all copies or substantial portions of the Work.
14 # THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
22 #----------------------------------------------------------------------
25 # SFA uses two crypto libraries: pyOpenSSL and M2Crypto to implement
26 # the necessary crypto functionality. Ideally just one of these libraries
27 # would be used, but unfortunately each of these libraries is independently
28 # lacking. The pyOpenSSL library is missing many necessary functions, and
29 # the M2Crypto library has crashed inside of some of the functions. The
30 # design decision is to use pyOpenSSL whenever possible as it seems more
31 # stable, and only use M2Crypto for those functions that are not possible
34 # This module exports two classes: Keypair and Certificate.
42 from tempfile import mkstemp
44 from OpenSSL import crypto
46 from M2Crypto import X509
48 from sfa.util.faults import CertExpired, CertMissingParent, CertNotSignedByParent
49 from sfa.util.sfalogging import logger
51 glo_passphrase_callback = None
54 # A global callback msy be implemented for requesting passphrases from the
55 # user. The function will be called with three arguments:
57 # keypair_obj: the keypair object that is calling the passphrase
58 # string: the string containing the private key that's being loaded
59 # x: unknown, appears to be 0, comes from pyOpenSSL and/or m2crypto
61 # The callback should return a string containing the passphrase.
63 def set_passphrase_callback(callback_func):
64 global glo_passphrase_callback
66 glo_passphrase_callback = callback_func
69 # Sets a fixed passphrase.
71 def set_passphrase(passphrase):
72 set_passphrase_callback( lambda k,s,x: passphrase )
75 # Check to see if a passphrase works for a particular private key string.
76 # Intended to be used by passphrase callbacks for input validation.
78 def test_passphrase(string, passphrase):
80 crypto.load_privatekey(crypto.FILETYPE_PEM, string, (lambda x: passphrase))
85 def convert_public_key(key):
86 keyconvert_path = "/usr/bin/keyconvert.py"
87 if not os.path.isfile(keyconvert_path):
88 raise IOError, "Could not find keyconvert in %s" % keyconvert_path
90 # we can only convert rsa keys
94 (ssh_f, ssh_fn) = tempfile.mkstemp()
95 ssl_fn = tempfile.mktemp()
99 cmd = keyconvert_path + " " + ssh_fn + " " + ssl_fn
102 # this check leaves the temporary file containing the public key so
103 # that it can be expected to see why it failed.
104 # TODO: for production, cleanup the temporary files
105 if not os.path.exists(ssl_fn):
110 k.load_pubkey_from_file(ssl_fn)
112 logger.log_exc("convert_public_key caught exception")
115 # remove the temporary files
122 # Public-private key pairs are implemented by the Keypair class.
123 # A Keypair object may represent both a public and private key pair, or it
124 # may represent only a public key (this usage is consistent with OpenSSL).
127 key = None # public/private keypair
128 m2key = None # public key (m2crypto format)
131 # Creates a Keypair object
132 # @param create If create==True, creates a new public/private key and
133 # stores it in the object
134 # @param string If string!=None, load the keypair from the string (PEM)
135 # @param filename If filename!=None, load the keypair from the file
137 def __init__(self, create=False, string=None, filename=None):
141 self.load_from_string(string)
143 self.load_from_file(filename)
146 # Create a RSA public/private key pair and store it inside the keypair object
149 self.key = crypto.PKey()
150 self.key.generate_key(crypto.TYPE_RSA, 1024)
153 # Save the private key to a file
154 # @param filename name of file to store the keypair in
156 def save_to_file(self, filename):
157 open(filename, 'w').write(self.as_pem())
158 self.filename=filename
161 # Load the private key from a file. Implicity the private key includes the public key.
163 def load_from_file(self, filename):
164 self.filename=filename
165 buffer = open(filename, 'r').read()
166 self.load_from_string(buffer)
169 # Load the private key from a string. Implicitly the private key includes the public key.
171 def load_from_string(self, string):
172 if glo_passphrase_callback:
173 self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string) )
174 self.m2key = M2Crypto.EVP.load_key_string(string, functools.partial(glo_passphrase_callback, self, string) )
176 self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string)
177 self.m2key = M2Crypto.EVP.load_key_string(string)
180 # Load the public key from a string. No private key is loaded.
182 def load_pubkey_from_file(self, filename):
183 # load the m2 public key
184 m2rsakey = M2Crypto.RSA.load_pub_key(filename)
185 self.m2key = M2Crypto.EVP.PKey()
186 self.m2key.assign_rsa(m2rsakey)
188 # create an m2 x509 cert
189 m2name = M2Crypto.X509.X509_Name()
190 m2name.add_entry_by_txt(field="CN", type=0x1001, entry="junk", len=-1, loc=-1, set=0)
191 m2x509 = M2Crypto.X509.X509()
192 m2x509.set_pubkey(self.m2key)
193 m2x509.set_serial_number(0)
194 m2x509.set_issuer_name(m2name)
195 m2x509.set_subject_name(m2name)
196 ASN1 = M2Crypto.ASN1.ASN1_UTCTIME()
198 m2x509.set_not_before(ASN1)
199 m2x509.set_not_after(ASN1)
200 # x509v3 so it can have extensions
201 # prob not necc since this cert itself is junk but still...
202 m2x509.set_version(2)
203 junk_key = Keypair(create=True)
204 m2x509.sign(pkey=junk_key.get_m2_pkey(), md="sha1")
206 # convert the m2 x509 cert to a pyopenssl x509
207 m2pem = m2x509.as_pem()
208 pyx509 = crypto.load_certificate(crypto.FILETYPE_PEM, m2pem)
210 # get the pyopenssl pkey from the pyopenssl x509
211 self.key = pyx509.get_pubkey()
212 self.filename=filename
215 # Load the public key from a string. No private key is loaded.
217 def load_pubkey_from_string(self, string):
218 (f, fn) = tempfile.mkstemp()
221 self.load_pubkey_from_file(fn)
225 # Return the private key in PEM format.
228 return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.key)
231 # Return an M2Crypto key object
233 def get_m2_pkey(self):
235 self.m2key = M2Crypto.EVP.load_key_string(self.as_pem())
239 # Returns a string containing the public key represented by this object.
241 def get_pubkey_string(self):
242 m2pkey = self.get_m2_pkey()
243 return base64.b64encode(m2pkey.as_der())
246 # Return an OpenSSL pkey object
248 def get_openssl_pkey(self):
252 # Given another Keypair object, return TRUE if the two keys are the same.
254 def is_same(self, pkey):
255 return self.as_pem() == pkey.as_pem()
257 def sign_string(self, data):
258 k = self.get_m2_pkey()
261 return base64.b64encode(k.sign_final())
263 def verify_string(self, data, sig):
264 k = self.get_m2_pkey()
266 k.verify_update(data)
267 return M2Crypto.m2.verify_final(k.ctx, base64.b64decode(sig), k.pkey)
269 def compute_hash(self, value):
270 return self.sign_string(str(value))
273 def get_filename(self):
274 return getattr(self,'filename',None)
276 def dump (self, *args, **kwargs):
277 print self.dump_string(*args, **kwargs)
279 def dump_string (self):
281 result += "KEYPAIR: pubkey=%40s..."%self.get_pubkey_string()
282 filename=self.get_filename()
283 if filename: result += "Filename %s\n"%filename
287 # The certificate class implements a general purpose X509 certificate, making
288 # use of the appropriate pyOpenSSL or M2Crypto abstractions. It also adds
289 # several addition features, such as the ability to maintain a chain of
290 # parent certificates, and storage of application-specific data.
292 # Certificates include the ability to maintain a chain of parents. Each
293 # certificate includes a pointer to it's parent certificate. When loaded
294 # from a file or a string, the parent chain will be automatically loaded.
295 # When saving a certificate to a file or a string, the caller can choose
296 # whether to save the parent certificates as well.
305 isCA = None # will be a boolean once set
307 separator="-----parent-----"
310 # Create a certificate object.
312 # @param lifeDays life of cert in days - default is 1825==5 years
313 # @param create If create==True, then also create a blank X509 certificate.
314 # @param subject If subject!=None, then create a blank certificate and set
316 # @param string If string!=None, load the certficate from the string.
317 # @param filename If filename!=None, load the certficiate from the file.
318 # @param isCA If !=None, set whether this cert is for a CA
320 def __init__(self, lifeDays=1825, create=False, subject=None, string=None, filename=None, isCA=None):
322 if create or subject:
323 self.create(lifeDays)
325 self.set_subject(subject)
327 self.load_from_string(string)
329 self.load_from_file(filename)
331 # Set the CA bit if a value was supplied
335 # Create a blank X509 certificate and store it in this object.
337 def create(self, lifeDays=1825):
338 self.cert = crypto.X509()
339 # FIXME: Use different serial #s
340 self.cert.set_serial_number(3)
341 self.cert.gmtime_adj_notBefore(0) # 0 means now
342 self.cert.gmtime_adj_notAfter(lifeDays*60*60*24) # five years is default
343 self.cert.set_version(2) # x509v3 so it can have extensions
347 # Given a pyOpenSSL X509 object, store that object inside of this
348 # certificate object.
350 def load_from_pyopenssl_x509(self, x509):
354 # Load the certificate from a string
356 def load_from_string(self, string):
357 # if it is a chain of multiple certs, then split off the first one and
358 # load it (support for the ---parent--- tag as well as normal chained certs)
360 string = string.strip()
362 # If it's not in proper PEM format, wrap it
363 if string.count('-----BEGIN CERTIFICATE') == 0:
364 string = '-----BEGIN CERTIFICATE-----\n%s\n-----END CERTIFICATE-----' % string
366 # If there is a PEM cert in there, but there is some other text first
367 # such as the text of the certificate, skip the text
368 beg = string.find('-----BEGIN CERTIFICATE')
370 # skipping over non cert beginning
371 string = string[beg:]
375 if string.count('-----BEGIN CERTIFICATE-----') > 1 and \
376 string.count(Certificate.separator) == 0:
377 parts = string.split('-----END CERTIFICATE-----',1)
378 parts[0] += '-----END CERTIFICATE-----'
380 parts = string.split(Certificate.separator, 1)
382 self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, parts[0])
384 # if there are more certs, then create a parent and let the parent load
385 # itself from the remainder of the string
386 if len(parts) > 1 and parts[1] != '':
387 self.parent = self.__class__()
388 self.parent.load_from_string(parts[1])
391 # Load the certificate from a file
393 def load_from_file(self, filename):
394 file = open(filename)
396 self.load_from_string(string)
397 self.filename=filename
400 # Save the certificate to a string.
402 # @param save_parents If save_parents==True, then also save the parent certificates.
404 def save_to_string(self, save_parents=True):
405 string = crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert)
406 if save_parents and self.parent:
407 string = string + self.parent.save_to_string(save_parents)
411 # Save the certificate to a file.
412 # @param save_parents If save_parents==True, then also save the parent certificates.
414 def save_to_file(self, filename, save_parents=True, filep=None):
415 string = self.save_to_string(save_parents=save_parents)
419 f = open(filename, 'w')
422 self.filename=filename
425 # Save the certificate to a random file in /tmp/
426 # @param save_parents If save_parents==True, then also save the parent certificates.
427 def save_to_random_tmp_file(self, save_parents=True):
428 fp, filename = mkstemp(suffix='cert', text=True)
429 fp = os.fdopen(fp, "w")
430 self.save_to_file(filename, save_parents=True, filep=fp)
434 # Sets the issuer private key and name
435 # @param key Keypair object containing the private key of the issuer
436 # @param subject String containing the name of the issuer
437 # @param cert (optional) Certificate object containing the name of the issuer
439 def set_issuer(self, key, subject=None, cert=None):
442 # it's a mistake to use subject and cert params at the same time
444 if isinstance(subject, dict) or isinstance(subject, str):
445 req = crypto.X509Req()
446 reqSubject = req.get_subject()
447 if (isinstance(subject, dict)):
448 for key in reqSubject.keys():
449 setattr(reqSubject, key, subject[key])
451 setattr(reqSubject, "CN", subject)
453 # subject is not valid once req is out of scope, so save req
456 # if a cert was supplied, then get the subject from the cert
457 subject = cert.cert.get_subject()
459 self.issuerSubject = subject
462 # Get the issuer name
464 def get_issuer(self, which="CN"):
465 x = self.cert.get_issuer()
466 return getattr(x, which)
469 # Set the subject name of the certificate
471 def set_subject(self, name):
472 req = crypto.X509Req()
473 subj = req.get_subject()
474 if (isinstance(name, dict)):
475 for key in name.keys():
476 setattr(subj, key, name[key])
478 setattr(subj, "CN", name)
479 self.cert.set_subject(subj)
482 # Get the subject name of the certificate
484 def get_subject(self, which="CN"):
485 x = self.cert.get_subject()
486 return getattr(x, which)
489 # Get a pretty-print subject name of the certificate
491 def get_printable_subject(self):
492 x = self.cert.get_subject()
493 return "[ OU: %s, CN: %s, SubjectAltName: %s ]" % (getattr(x, "OU"), getattr(x, "CN"), self.get_data())
496 # Get the public key of the certificate.
498 # @param key Keypair object containing the public key
500 def set_pubkey(self, key):
501 assert(isinstance(key, Keypair))
502 self.cert.set_pubkey(key.get_openssl_pkey())
505 # Get the public key of the certificate.
506 # It is returned in the form of a Keypair object.
508 def get_pubkey(self):
509 m2x509 = X509.load_cert_string(self.save_to_string())
511 pkey.key = self.cert.get_pubkey()
512 pkey.m2key = m2x509.get_pubkey()
515 def set_intermediate_ca(self, val):
516 return self.set_is_ca(val)
518 # Set whether this cert is for a CA. All signers and only signers should be CAs.
519 # The local member starts unset, letting us check that you only set it once
520 # @param val Boolean indicating whether this cert is for a CA
521 def set_is_ca(self, val):
525 if self.isCA != None:
526 # Can't double set properties
527 raise Exception, "Cannot set basicConstraints CA:?? more than once. Was %s, trying to set as %s" % (self.isCA, val)
531 self.add_extension('basicConstraints', 1, 'CA:TRUE')
533 self.add_extension('basicConstraints', 1, 'CA:FALSE')
538 # Add an X509 extension to the certificate. Add_extension can only be called
539 # once for a particular extension name, due to limitations in the underlying
542 # @param name string containing name of extension
543 # @param value string containing value of the extension
545 def add_extension(self, name, critical, value):
548 oldExtVal = self.get_extension(name)
550 # M2Crypto LookupError when the extension isn't there (yet)
553 # This code limits you from adding the extension with the same value
554 # The method comment says you shouldn't do this with the same name
555 # But actually it (m2crypto) appears to allow you to do this.
556 if oldExtVal and oldExtVal == value:
557 # don't add this extension again
558 # just do nothing as here
560 # FIXME: What if they are trying to set with a different value?
561 # Is this ever OK? Or should we raise an exception?
563 # raise "Cannot add extension %s which had val %s with new val %s" % (name, oldExtVal, value)
565 ext = crypto.X509Extension (name, critical, value)
566 self.cert.add_extensions([ext])
569 # Get an X509 extension from the certificate
571 def get_extension(self, name):
573 # pyOpenSSL does not have a way to get extensions
574 m2x509 = X509.load_cert_string(self.save_to_string())
575 value = m2x509.get_ext(name).get_value()
580 # Set_data is a wrapper around add_extension. It stores the parameter str in
581 # the X509 subject_alt_name extension. Set_data can only be called once, due
582 # to limitations in the underlying library.
584 def set_data(self, str, field='subjectAltName'):
585 # pyOpenSSL only allows us to add extensions, so if we try to set the
586 # same extension more than once, it will not work
587 if self.data.has_key(field):
588 raise "Cannot set ", field, " more than once"
589 self.data[field] = str
590 self.add_extension(field, 0, str)
593 # Return the data string that was previously set with set_data
595 def get_data(self, field='subjectAltName'):
596 if self.data.has_key(field):
597 return self.data[field]
600 uri = self.get_extension(field)
601 self.data[field] = uri
605 return self.data[field]
608 # Sign the certificate using the issuer private key and issuer subject previous set with set_issuer().
611 logger.debug('certificate.sign')
612 assert self.cert != None
613 assert self.issuerSubject != None
614 assert self.issuerKey != None
615 self.cert.set_issuer(self.issuerSubject)
616 self.cert.sign(self.issuerKey.get_openssl_pkey(), self.digest)
619 # Verify the authenticity of a certificate.
620 # @param pkey is a Keypair object representing a public key. If Pkey
621 # did not sign the certificate, then an exception will be thrown.
623 def verify(self, pkey):
624 # pyOpenSSL does not have a way to verify signatures
625 m2x509 = X509.load_cert_string(self.save_to_string())
626 m2pkey = pkey.get_m2_pkey()
628 return m2x509.verify(m2pkey)
630 # XXX alternatively, if openssl has been patched, do the much simpler:
632 # self.cert.verify(pkey.get_openssl_key())
638 # Return True if pkey is identical to the public key that is contained in the certificate.
639 # @param pkey Keypair object
641 def is_pubkey(self, pkey):
642 return self.get_pubkey().is_same(pkey)
645 # Given a certificate cert, verify that this certificate was signed by the
646 # public key contained in cert. Throw an exception otherwise.
648 # @param cert certificate object
650 def is_signed_by_cert(self, cert):
651 k = cert.get_pubkey()
652 result = self.verify(k)
656 # Set the parent certficiate.
658 # @param p certificate object.
660 def set_parent(self, p):
664 # Return the certificate object of the parent of this certificate.
666 def get_parent(self):
670 # Verification examines a chain of certificates to ensure that each parent
671 # signs the child, and that some certificate in the chain is signed by a
672 # trusted certificate.
674 # Verification is a basic recursion: <pre>
675 # if this_certificate was signed by trusted_certs:
678 # return verify_chain(parent, trusted_certs)
681 # At each recursion, the parent is tested to ensure that it did sign the
682 # child. If a parent did not sign a child, then an exception is thrown. If
683 # the bottom of the recursion is reached and the certificate does not match
684 # a trusted root, then an exception is thrown.
685 # Also require that parents are CAs.
687 # @param Trusted_certs is a list of certificates that are trusted.
690 def verify_chain(self, trusted_certs = None):
691 # Verify a chain of certificates. Each certificate must be signed by
692 # the public key contained in it's parent. The chain is recursed
693 # until a certificate is found that is signed by a trusted root.
695 # verify expiration time
696 if self.cert.has_expired():
697 logger.debug("verify_chain: NO, Certificate %s has expired" % self.get_printable_subject())
698 raise CertExpired(self.get_printable_subject(), "client cert")
700 # if this cert is signed by a trusted_cert, then we are set
701 for trusted_cert in trusted_certs:
702 if self.is_signed_by_cert(trusted_cert):
703 # verify expiration of trusted_cert ?
704 if not trusted_cert.cert.has_expired():
705 logger.debug("verify_chain: YES. Cert %s signed by trusted cert %s"%(
706 self.get_printable_subject(), trusted_cert.get_printable_subject()))
709 logger.debug("verify_chain: NO. Cert %s is signed by trusted_cert %s, but that signer is expired..."%(
710 self.get_printable_subject(),trusted_cert.get_printable_subject()))
711 raise CertExpired(self.get_printable_subject()," signer trusted_cert %s"%trusted_cert.get_printable_subject())
713 # if there is no parent, then no way to verify the chain
715 logger.debug("verify_chain: NO. %s has no parent and issuer %s is not in %d trusted roots"%(self.get_printable_subject(), self.get_issuer(), len(trusted_certs)))
716 raise CertMissingParent(self.get_printable_subject() + ": Issuer %s not trusted by any of %d trusted roots, and cert has no parent." % (self.get_issuer(), len(trusted_certs)))
718 # if it wasn't signed by the parent...
719 if not self.is_signed_by_cert(self.parent):
720 logger.debug("verify_chain: NO. %s is not signed by parent %s, but by %s"%\
721 (self.get_printable_subject(),
722 self.parent.get_printable_subject(),
724 raise CertNotSignedByParent("%s: Parent %s, issuer %s"\
725 % (self.get_printable_subject(),
726 self.parent.get_printable_subject(),
729 # Confirm that the parent is a CA. Only CAs can be trusted as
731 # Note that trusted roots are not parents, so don't need to be
733 # Ugly - cert objects aren't parsed so we need to read the
734 # extension and hope there are no other basicConstraints
735 if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
736 logger.warn("verify_chain: cert %s's parent %s is not a CA" % \
737 (self.get_printable_subject(), self.parent.get_printable_subject()))
738 raise CertNotSignedByParent("%s: Parent %s not a CA" % (self.get_printable_subject(),
739 self.parent.get_printable_subject()))
741 # if the parent isn't verified...
742 logger.debug("verify_chain: .. %s, -> verifying parent %s"%\
743 (self.get_printable_subject(),self.parent.get_printable_subject()))
744 self.parent.verify_chain(trusted_certs)
748 ### more introspection
749 def get_extensions(self):
750 # pyOpenSSL does not have a way to get extensions
752 m2x509 = X509.load_cert_string(self.save_to_string())
753 nb_extensions=m2x509.get_ext_count()
754 logger.debug("X509 had %d extensions"%nb_extensions)
755 for i in range(nb_extensions):
756 ext=m2x509.get_ext_at(i)
757 triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) )
760 def get_data_names(self):
761 return self.data.keys()
763 def get_all_datas (self):
764 triples=self.get_extensions()
765 for name in self.get_data_names():
766 triples.append( (name,self.get_data(name),'data',) )
770 def get_filename(self):
771 return getattr(self,'filename',None)
773 def dump (self, *args, **kwargs):
774 print self.dump_string(*args, **kwargs)
776 def dump_string (self,show_extensions=False):
778 result += "CERTIFICATE for %s\n"%self.get_printable_subject()
779 result += "Issued by %s\n"%self.get_issuer()
780 filename=self.get_filename()
781 if filename: result += "Filename %s\n"%filename
783 all_datas=self.get_all_datas()
784 result += " has %d extensions/data attached"%len(all_datas)
785 for (n,v,c) in all_datas:
787 result += " data: %s=%s\n"%(n,v)
789 result += " ext: %s (crit=%s)=<<<%s>>>\n"%(n,c,v)