Python3 compatibility for Credential & Certificate in save_to_string & save_to_file...
[sfa.git] / sfa / trust / certificate.py
1 #----------------------------------------------------------------------
2 # Copyright (c) 2008 Board of Trustees, Princeton University
3 #
4 # Permission is hereby granted, free of charge, to any person obtaining
5 # a copy of this software and/or hardware specification (the "Work") to
6 # deal in the Work without restriction, including without limitation the
7 # rights to use, copy, modify, merge, publish, distribute, sublicense,
8 # and/or sell copies of the Work, and to permit persons to whom the Work
9 # is furnished to do so, subject to the following conditions:
10 #
11 # The above copyright notice and this permission notice shall be
12 # included in all copies or substantial portions of the Work.
13 #
14 # THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
18 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 
20 # OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS 
21 # IN THE WORK.
22 #----------------------------------------------------------------------
23
24 ##
25 # SFA uses two crypto libraries: pyOpenSSL and M2Crypto to implement
26 # the necessary crypto functionality. Ideally just one of these libraries
27 # would be used, but unfortunately each of these libraries is independently
28 # lacking. The pyOpenSSL library is missing many necessary functions, and
29 # the M2Crypto library has crashed inside of some of the functions. The
30 # design decision is to use pyOpenSSL whenever possible as it seems more
31 # stable, and only use M2Crypto for those functions that are not possible
32 # in pyOpenSSL.
33 #
34 # This module exports two classes: Keypair and Certificate.
35 ##
36 #
37
38 from __future__ import print_function
39
40 import functools
41 import os
42 import tempfile
43 import base64
44 from tempfile import mkstemp
45
46 import OpenSSL
47 # M2Crypto is imported on the fly to minimize crashes
48 #import M2Crypto
49
50 from sfa.util.faults import CertExpired, CertMissingParent, CertNotSignedByParent
51 from sfa.util.sfalogging import logger
52
53 # this tends to generate quite some logs for little or no value
54 debug_verify_chain = False
55
56 glo_passphrase_callback = None
57
58 ##
59 # A global callback may be implemented for requesting passphrases from the
60 # user. The function will be called with three arguments:
61 #
62 #    keypair_obj: the keypair object that is calling the passphrase
63 #    string: the string containing the private key that's being loaded
64 #    x: unknown, appears to be 0, comes from pyOpenSSL and/or m2crypto
65 #
66 # The callback should return a string containing the passphrase.
67
68 def set_passphrase_callback(callback_func):
69     global glo_passphrase_callback
70
71     glo_passphrase_callback = callback_func
72
73 ##
74 # Sets a fixed passphrase.
75
76 def set_passphrase(passphrase):
77     set_passphrase_callback( lambda k,s,x: passphrase )
78
79 ##
80 # Check to see if a passphrase works for a particular private key string.
81 # Intended to be used by passphrase callbacks for input validation.
82
83 def test_passphrase(string, passphrase):
84     try:
85         OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, string, (lambda x: passphrase))
86         return True
87     except:
88         return False
89
90 def convert_public_key(key):
91     keyconvert_path = "/usr/bin/keyconvert.py"
92     if not os.path.isfile(keyconvert_path):
93         raise IOError("Could not find keyconvert in {}".format(keyconvert_path))
94
95     # we can only convert rsa keys
96     if "ssh-dss" in key:
97         raise Exception("keyconvert: dss keys are not supported")
98
99     (ssh_f, ssh_fn) = tempfile.mkstemp()
100     ssl_fn = tempfile.mktemp()
101     os.write(ssh_f, key)
102     os.close(ssh_f)
103
104     cmd = keyconvert_path + " " + ssh_fn + " " + ssl_fn
105     os.system(cmd)
106
107     # this check leaves the temporary file containing the public key so
108     # that it can be expected to see why it failed.
109     # TODO: for production, cleanup the temporary files
110     if not os.path.exists(ssl_fn):
111         raise Exception("keyconvert: generated certificate not found. keyconvert may have failed.")
112
113     k = Keypair()
114     try:
115         k.load_pubkey_from_file(ssl_fn)
116         return k
117     except:
118         logger.log_exc("convert_public_key caught exception")
119         raise
120     finally:
121         # remove the temporary files
122         if os.path.exists(ssh_fn):
123             os.remove(ssh_fn)
124         if os.path.exists(ssl_fn):
125             os.remove(ssl_fn)
126
127 ##
128 # Public-private key pairs are implemented by the Keypair class.
129 # A Keypair object may represent both a public and private key pair, or it
130 # may represent only a public key (this usage is consistent with OpenSSL).
131
132 class Keypair:
133     key = None       # public/private keypair
134     m2key = None     # public key (m2crypto format)
135
136     ##
137     # Creates a Keypair object
138     # @param create If create==True, creates a new public/private key and
139     #     stores it in the object
140     # @param string If string != None, load the keypair from the string (PEM)
141     # @param filename If filename != None, load the keypair from the file
142
143     def __init__(self, create=False, string=None, filename=None):
144         if create:
145             self.create()
146         if string:
147             self.load_from_string(string)
148         if filename:
149             self.load_from_file(filename)
150
151     ##
152     # Create a RSA public/private key pair and store it inside the keypair object
153
154     def create(self):
155         self.key = OpenSSL.crypto.PKey()
156         self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
157
158     ##
159     # Save the private key to a file
160     # @param filename name of file to store the keypair in
161
162     def save_to_file(self, filename):
163         open(filename, 'w').write(self.as_pem())
164         self.filename = filename
165
166     ##
167     # Load the private key from a file. Implicity the private key includes the public key.
168
169     def load_from_file(self, filename):
170         self.filename = filename
171         buffer = open(filename, 'r').read()
172         self.load_from_string(buffer)
173
174     ##
175     # Load the private key from a string. Implicitly the private key includes the public key.
176
177     def load_from_string(self, string):
178         import M2Crypto
179         if glo_passphrase_callback:
180             self.key = OpenSSL.crypto.load_privatekey(
181                 OpenSSL.crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string))
182             self.m2key = M2Crypto.EVP.load_key_string(
183                 string, functools.partial(glo_passphrase_callback, self, string))
184         else:
185             self.key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, string)
186             self.m2key = M2Crypto.EVP.load_key_string(string)
187
188     ##
189     #  Load the public key from a string. No private key is loaded.
190
191     def load_pubkey_from_file(self, filename):
192         import M2Crypto
193         # load the m2 public key
194         m2rsakey = M2Crypto.RSA.load_pub_key(filename)
195         self.m2key = M2Crypto.EVP.PKey()
196         self.m2key.assign_rsa(m2rsakey)
197
198         # create an m2 x509 cert
199         m2name = M2Crypto.X509.X509_Name()
200         m2name.add_entry_by_txt(field="CN", type=0x1001, entry="junk", len=-1, loc=-1, set=0)
201         m2x509 = M2Crypto.X509.X509()
202         m2x509.set_pubkey(self.m2key)
203         m2x509.set_serial_number(0)
204         m2x509.set_issuer_name(m2name)
205         m2x509.set_subject_name(m2name)
206         ASN1 = M2Crypto.ASN1.ASN1_UTCTIME()
207         ASN1.set_time(500)
208         m2x509.set_not_before(ASN1)
209         m2x509.set_not_after(ASN1)
210         # x509v3 so it can have extensions
211         # prob not necc since this cert itself is junk but still...
212         m2x509.set_version(2)
213         junk_key = Keypair(create=True)
214         m2x509.sign(pkey=junk_key.get_m2_pubkey(), md="sha1")
215
216         # convert the m2 x509 cert to a pyopenssl x509
217         m2pem = m2x509.as_pem()
218         pyx509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, m2pem)
219
220         # get the pyopenssl pkey from the pyopenssl x509
221         self.key = pyx509.get_pubkey()
222         self.filename = filename
223
224     ##
225     # Load the public key from a string. No private key is loaded.
226
227     def load_pubkey_from_string(self, string):
228         (f, fn) = tempfile.mkstemp()
229         os.write(f, string)
230         os.close(f)
231         self.load_pubkey_from_file(fn)
232         os.remove(fn)
233
234     ##
235     # Return the private key in PEM format.
236
237     def as_pem(self):
238         return OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, self.key)
239
240     ##
241     # Return an M2Crypto key object
242
243     def get_m2_pubkey(self):
244         import M2Crypto
245         if not self.m2key:
246             self.m2key = M2Crypto.EVP.load_key_string(self.as_pem())
247         return self.m2key
248
249     ##
250     # Returns a string containing the public key represented by this object.
251
252     def get_pubkey_string(self):
253         m2pkey = self.get_m2_pubkey()
254         return base64.b64encode(m2pkey.as_der())
255
256     ##
257     # Return an OpenSSL pkey object
258
259     def get_openssl_pkey(self):
260         return self.key
261
262     ##
263     # Given another Keypair object, return TRUE if the two keys are the same.
264
265     def is_same(self, pkey):
266         return self.as_pem() == pkey.as_pem()
267
268     def sign_string(self, data):
269         k = self.get_m2_pubkey()
270         k.sign_init()
271         k.sign_update(data)
272         return base64.b64encode(k.sign_final())
273
274     def verify_string(self, data, sig):
275         import M2Crypto
276         k = self.get_m2_pubkey()
277         k.verify_init()
278         k.verify_update(data)
279         return M2Crypto.m2.verify_final(k.ctx, base64.b64decode(sig), k.pkey)
280
281     def compute_hash(self, value):
282         return self.sign_string(str(value))
283
284     # only informative
285     def get_filename(self):
286         return getattr(self,'filename',None)
287
288     def dump(self, *args, **kwargs):
289         print(self.dump_string(*args, **kwargs))
290
291     def dump_string(self):
292         result =  ""
293         result += "KEYPAIR: pubkey={:>40}...".format(self.get_pubkey_string())
294         filename = self.get_filename()
295         if filename: result += "Filename {}\n".format(filename)
296         return result
297
298 ##
299 # The certificate class implements a general purpose X509 certificate, making
300 # use of the appropriate pyOpenSSL or M2Crypto abstractions. It also adds
301 # several addition features, such as the ability to maintain a chain of
302 # parent certificates, and storage of application-specific data.
303 #
304 # Certificates include the ability to maintain a chain of parents. Each
305 # certificate includes a pointer to it's parent certificate. When loaded
306 # from a file or a string, the parent chain will be automatically loaded.
307 # When saving a certificate to a file or a string, the caller can choose
308 # whether to save the parent certificates as well.
309
310 class Certificate:
311     digest = "sha256"
312
313 #    x509 = None
314 #    issuerKey = None
315 #    issuerSubject = None
316 #    parent = None
317     isCA = None # will be a boolean once set
318
319     separator = "-----parent-----"
320
321     ##
322     # Create a certificate object.
323     #
324     # @param lifeDays life of cert in days - default is 1825==5 years
325     # @param create If create==True, then also create a blank X509 certificate.
326     # @param subject If subject!=None, then create a blank certificate and set
327     #     it's subject name.
328     # @param string If string!=None, load the certficate from the string.
329     # @param filename If filename!=None, load the certficiate from the file.
330     # @param isCA If !=None, set whether this cert is for a CA
331
332     def __init__(self, lifeDays=1825, create=False, subject=None, string=None, filename=None, isCA=None):
333         # these used to be defined in the class !
334         self.x509 = None
335         self.issuerKey = None
336         self.issuerSubject = None
337         self.parent = None
338
339         self.data = {}
340         if create or subject:
341             self.create(lifeDays)
342         if subject:
343             self.set_subject(subject)
344         if string:
345             self.load_from_string(string)
346         if filename:
347             self.load_from_file(filename)
348
349         # Set the CA bit if a value was supplied
350         if isCA != None:
351             self.set_is_ca(isCA)
352
353     # Create a blank X509 certificate and store it in this object.
354
355     def create(self, lifeDays=1825):
356         self.x509 = OpenSSL.crypto.X509()
357         # FIXME: Use different serial #s
358         self.x509.set_serial_number(3)
359         self.x509.gmtime_adj_notBefore(0) # 0 means now
360         self.x509.gmtime_adj_notAfter(lifeDays*60*60*24) # five years is default
361         self.x509.set_version(2) # x509v3 so it can have extensions
362
363
364     ##
365     # Given a pyOpenSSL X509 object, store that object inside of this
366     # certificate object.
367
368     def load_from_pyopenssl_x509(self, x509):
369         self.x509 = x509
370
371     ##
372     # Load the certificate from a string
373
374     def load_from_string(self, string):
375         # if it is a chain of multiple certs, then split off the first one and
376         # load it (support for the ---parent--- tag as well as normal chained certs)
377
378         if string is None or string.strip() == "":
379             logger.warn("Empty string in load_from_string")
380             return
381
382         string = string.strip()
383         
384         # If it's not in proper PEM format, wrap it
385         if string.count('-----BEGIN CERTIFICATE') == 0:
386             string = '-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----'\
387                      .format(string)
388
389         # If there is a PEM cert in there, but there is some other text first
390         # such as the text of the certificate, skip the text
391         beg = string.find('-----BEGIN CERTIFICATE')
392         if beg > 0:
393             # skipping over non cert beginning                                                                                                              
394             string = string[beg:]
395
396         parts = []
397
398         if string.count('-----BEGIN CERTIFICATE-----') > 1 and \
399                string.count(Certificate.separator) == 0:
400             parts = string.split('-----END CERTIFICATE-----',1)
401             parts[0] += '-----END CERTIFICATE-----'
402         else:
403             parts = string.split(Certificate.separator, 1)
404
405         self.x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, parts[0])
406
407         if self.x509 is None:
408             logger.warn("Loaded from string but cert is None: {}".format(string))
409
410         # if there are more certs, then create a parent and let the parent load
411         # itself from the remainder of the string
412         if len(parts) > 1 and parts[1] != '':
413             self.parent = self.__class__()
414             self.parent.load_from_string(parts[1])
415
416     ##
417     # Load the certificate from a file
418
419     def load_from_file(self, filename):
420         file = open(filename)
421         string = file.read()
422         self.load_from_string(string)
423         self.filename = filename
424
425     ##
426     # Save the certificate to a string.
427     #
428     # @param save_parents If save_parents==True, then also save the parent certificates.
429
430     def save_to_string(self, save_parents=True):
431         if self.x509 is None:
432             logger.warn("None cert in certificate.save_to_string")
433             return ""
434         string = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, self.x509)
435         if isinstance(string, bytes):
436             string = string.decode()
437         if save_parents and self.parent:
438             string = string + self.parent.save_to_string(save_parents)
439         return string
440
441     ##
442     # Save the certificate to a file.
443     # @param save_parents If save_parents==True, then also save the parent certificates.
444
445     def save_to_file(self, filename, save_parents=True, filep=None):
446         string = self.save_to_string(save_parents=save_parents)
447         if filep:
448             f = filep
449         else:
450             f = open(filename, 'w')
451         if isinstance(string, bytes):
452             string = string.decode()
453         f.write(string)
454         f.close()
455         self.filename = filename
456
457     ##
458     # Save the certificate to a random file in /tmp/
459     # @param save_parents If save_parents==True, then also save the parent certificates.
460     def save_to_random_tmp_file(self, save_parents=True):
461         fp, filename = mkstemp(suffix='cert', text=True)
462         fp = os.fdopen(fp, "w")
463         self.save_to_file(filename, save_parents=True, filep=fp)
464         return filename
465
466     ##
467     # Sets the issuer private key and name
468     # @param key Keypair object containing the private key of the issuer
469     # @param subject String containing the name of the issuer
470     # @param cert (optional) Certificate object containing the name of the issuer
471
472     def set_issuer(self, key, subject=None, cert=None):
473         self.issuerKey = key
474         if subject:
475             # it's a mistake to use subject and cert params at the same time
476             assert(not cert)
477             if isinstance(subject, dict) or isinstance(subject, str):
478                 req = OpenSSL.crypto.X509Req()
479                 reqSubject = req.get_subject()
480                 if isinstance(subject, dict):
481                     for key in reqSubject.keys():
482                         setattr(reqSubject, key, subject[key])
483                 else:
484                     setattr(reqSubject, "CN", subject)
485                 subject = reqSubject
486                 # subject is not valid once req is out of scope, so save req
487                 self.issuerReq = req
488         if cert:
489             # if a cert was supplied, then get the subject from the cert
490             subject = cert.x509.get_subject()
491         assert(subject)
492         self.issuerSubject = subject
493
494     ##
495     # Get the issuer name
496
497     def get_issuer(self, which="CN"):
498         x = self.x509.get_issuer()
499         return getattr(x, which)
500
501     ##
502     # Set the subject name of the certificate
503
504     def set_subject(self, name):
505         req = OpenSSL.crypto.X509Req()
506         subj = req.get_subject()
507         if isinstance(name, dict):
508             for key in name.keys():
509                 setattr(subj, key, name[key])
510         else:
511             setattr(subj, "CN", name)
512         self.x509.set_subject(subj)
513
514     ##
515     # Get the subject name of the certificate
516
517     def get_subject(self, which="CN"):
518         x = self.x509.get_subject()
519         return getattr(x, which)
520
521     ##
522     # Get a pretty-print subject name of the certificate
523     # let's try to make this a little more usable as is makes logs hairy
524     # FIXME: Consider adding 'urn:publicid' and 'uuid' back for GENI?
525     pretty_fields = ['email']
526     def filter_chunk(self, chunk):
527         for field in self.pretty_fields:
528             if field in chunk:
529                 return " "+chunk
530
531     def pretty_cert(self):
532         message = "[Cert."
533         x = self.x509.get_subject()
534         ou = getattr(x, "OU")
535         if ou: message += " OU: {}".format(ou)
536         cn = getattr(x, "CN")
537         if cn: message += " CN: {}".format(cn)
538         data = self.get_data(field='subjectAltName')
539         if data:
540             message += " SubjectAltName:"
541             counter = 0
542             filtered = [self.filter_chunk(chunk) for chunk in data.split()]
543             message += " ".join( [f for f in filtered if f])
544             omitted = len([f for f in filtered if not f])
545             if omitted:
546                 message += "..+{} omitted".format(omitted)
547         message += "]"
548         return message
549
550     ##
551     # Get the public key of the certificate.
552     #
553     # @param key Keypair object containing the public key
554
555     def set_pubkey(self, key):
556         assert(isinstance(key, Keypair))
557         self.x509.set_pubkey(key.get_openssl_pkey())
558
559     ##
560     # Get the public key of the certificate.
561     # It is returned in the form of a Keypair object.
562
563     def get_pubkey(self):
564         import M2Crypto
565         m2x509 = M2Crypto.X509.load_cert_string(self.save_to_string())
566         pkey = Keypair()
567         pkey.key = self.x509.get_pubkey()
568         pkey.m2key = m2x509.get_pubkey()
569         return pkey
570
571     def set_intermediate_ca(self, val):
572         return self.set_is_ca(val)
573
574     # Set whether this cert is for a CA. All signers and only signers should be CAs.
575     # The local member starts unset, letting us check that you only set it once
576     # @param val Boolean indicating whether this cert is for a CA
577     def set_is_ca(self, val):
578         if val is None:
579             return
580
581         if self.isCA != None:
582             # Can't double set properties
583             raise Exception("Cannot set basicConstraints CA:?? more than once. "
584                             "Was {}, trying to set as {}%s".format(self.isCA, val))
585
586         self.isCA = val
587         if val:
588             self.add_extension('basicConstraints', 1, 'CA:TRUE')
589         else:
590             self.add_extension('basicConstraints', 1, 'CA:FALSE')
591
592
593
594     ##
595     # Add an X509 extension to the certificate. Add_extension can only be called
596     # once for a particular extension name, due to limitations in the underlying
597     # library.
598     #
599     # @param name string containing name of extension
600     # @param value string containing value of the extension
601
602     def add_extension(self, name, critical, value):
603         import M2Crypto
604         oldExtVal = None
605         try:
606             oldExtVal = self.get_extension(name)
607         except:
608             # M2Crypto LookupError when the extension isn't there (yet)
609             pass
610
611         # This code limits you from adding the extension with the same value
612         # The method comment says you shouldn't do this with the same name
613         # But actually it (m2crypto) appears to allow you to do this.
614         if oldExtVal and oldExtVal == value:
615             # don't add this extension again
616             # just do nothing as here
617             return
618         # FIXME: What if they are trying to set with a different value?
619         # Is this ever OK? Or should we raise an exception?
620 #        elif oldExtVal:
621 #            raise "Cannot add extension {} which had val {} with new val {}".format(name, oldExtVal, value)
622
623         ext = OpenSSL.crypto.X509Extension(name, critical, value)
624         self.x509.add_extensions([ext])
625
626     ##
627     # Get an X509 extension from the certificate
628
629     def get_extension(self, name):
630
631         import M2Crypto
632         if name is None:
633             return None
634
635         certstr = self.save_to_string()
636         if certstr is None or certstr == "":
637             return None
638         # pyOpenSSL does not have a way to get extensions
639         m2x509 = M2Crypto.X509.load_cert_string(certstr)
640         if m2x509 is None:
641             logger.warn("No cert loaded in get_extension")
642             return None
643         if m2x509.get_ext(name) is None:
644             return None
645         value = m2x509.get_ext(name).get_value()
646
647         return value
648
649     ##
650     # Set_data is a wrapper around add_extension. It stores the parameter str in
651     # the X509 subject_alt_name extension. Set_data can only be called once, due
652     # to limitations in the underlying library.
653
654     def set_data(self, str, field='subjectAltName'):
655         # pyOpenSSL only allows us to add extensions, so if we try to set the
656         # same extension more than once, it will not work
657         if field in self.data:
658             raise Exception("Cannot set {} more than once".format(field))
659         self.data[field] = str
660         self.add_extension(field, 0, str)
661
662     ##
663     # Return the data string that was previously set with set_data
664
665     def get_data(self, field='subjectAltName'):
666         if field in self.data:
667             return self.data[field]
668
669         try:
670             uri = self.get_extension(field)
671             self.data[field] = uri
672         except LookupError:
673             return None
674
675         return self.data[field]
676
677     ##
678     # Sign the certificate using the issuer private key and issuer subject previous set with set_issuer().
679
680     def sign(self):
681         logger.debug('certificate.sign')
682         assert self.x509 != None
683         assert self.issuerSubject != None
684         assert self.issuerKey != None
685         self.x509.set_issuer(self.issuerSubject)
686         self.x509.sign(self.issuerKey.get_openssl_pkey(), self.digest)
687
688     ##
689     # Verify the authenticity of a certificate.
690     # @param pkey is a Keypair object representing a public key. If Pkey
691     #     did not sign the certificate, then an exception will be thrown.
692
693     def verify(self, pubkey):
694         import M2Crypto
695         # pyOpenSSL does not have a way to verify signatures
696         m2x509 = M2Crypto.X509.load_cert_string(self.save_to_string())
697         m2pubkey = pubkey.get_m2_pubkey()
698         # verify it
699         # verify returns -1 or 0 on failure depending on how serious the
700         # error conditions are
701         return m2x509.verify(m2pubkey) == 1
702
703         # XXX alternatively, if openssl has been patched, do the much simpler:
704         # try:
705         #   self.x509.verify(pkey.get_openssl_key())
706         #   return 1
707         # except:
708         #   return 0
709
710     ##
711     # Return True if pkey is identical to the public key that is contained in the certificate.
712     # @param pkey Keypair object
713
714     def is_pubkey(self, pkey):
715         return self.get_pubkey().is_same(pkey)
716
717     ##
718     # Given a certificate cert, verify that this certificate was signed by the
719     # public key contained in cert. Throw an exception otherwise.
720     #
721     # @param cert certificate object
722
723     def is_signed_by_cert(self, cert):
724         k = cert.get_pubkey()
725         result = self.verify(k)
726         return result
727
728     ##
729     # Set the parent certficiate.
730     #
731     # @param p certificate object.
732
733     def set_parent(self, p):
734         self.parent = p
735
736     ##
737     # Return the certificate object of the parent of this certificate.
738
739     def get_parent(self):
740         return self.parent
741
742     ##
743     # Verification examines a chain of certificates to ensure that each parent
744     # signs the child, and that some certificate in the chain is signed by a
745     # trusted certificate.
746     #
747     # Verification is a basic recursion: <pre>
748     #     if this_certificate was signed by trusted_certs:
749     #         return
750     #     else
751     #         return verify_chain(parent, trusted_certs)
752     # </pre>
753     #
754     # At each recursion, the parent is tested to ensure that it did sign the
755     # child. If a parent did not sign a child, then an exception is thrown. If
756     # the bottom of the recursion is reached and the certificate does not match
757     # a trusted root, then an exception is thrown.
758     # Also require that parents are CAs.
759     #
760     # @param Trusted_certs is a list of certificates that are trusted.
761     #
762
763     def verify_chain(self, trusted_certs = None):
764         # Verify a chain of certificates. Each certificate must be signed by
765         # the public key contained in it's parent. The chain is recursed
766         # until a certificate is found that is signed by a trusted root.
767
768         # verify expiration time
769         if self.x509.has_expired():
770             if debug_verify_chain:
771                 logger.debug("verify_chain: NO, Certificate {} has expired"
772                              .format(self.pretty_cert()))
773             raise CertExpired(self.pretty_cert(), "client cert")
774
775         # if this cert is signed by a trusted_cert, then we are set
776         for trusted_cert in trusted_certs:
777             if self.is_signed_by_cert(trusted_cert):
778                 # verify expiration of trusted_cert ?
779                 if not trusted_cert.x509.has_expired():
780                     if debug_verify_chain:
781                         logger.debug("verify_chain: YES. Cert {} signed by trusted cert {}"
782                                      .format(self.pretty_cert(), trusted_cert.pretty_cert()))
783                     return trusted_cert
784                 else:
785                     if debug_verify_chain:
786                         logger.debug("verify_chain: NO. Cert {} is signed by trusted_cert {}, "
787                                      "but that signer is expired..."
788                                      .format(self.pretty_cert(),trusted_cert.pretty_cert()))
789                     raise CertExpired("{} signer trusted_cert {}"
790                                       .format(self.pretty_cert(), trusted_cert.pretty_cert()))
791
792         # if there is no parent, then no way to verify the chain
793         if not self.parent:
794             if debug_verify_chain:
795                 logger.debug("verify_chain: NO. {} has no parent "
796                              "and issuer {} is not in {} trusted roots"
797                              .format(self.pretty_cert(), self.get_issuer(), len(trusted_certs)))
798             raise CertMissingParent("{}: Issuer {} is not one of the {} trusted roots, "
799                                     "and cert has no parent."
800                                     .format(self.pretty_cert(), self.get_issuer(), len(trusted_certs)))
801
802         # if it wasn't signed by the parent...
803         if not self.is_signed_by_cert(self.parent):
804             if debug_verify_chain:
805                 logger.debug("verify_chain: NO. {} is not signed by parent {}, but by {}"
806                              .format(self.pretty_cert(),
807                                      self.parent.pretty_cert(),
808                                      self.get_issuer()))
809             raise CertNotSignedByParent("{}: Parent {}, issuer {}"
810                                         .format(self.pretty_cert(),
811                                                 self.parent.pretty_cert(),
812                                                 self.get_issuer()))
813
814         # Confirm that the parent is a CA. Only CAs can be trusted as
815         # signers.
816         # Note that trusted roots are not parents, so don't need to be
817         # CAs.
818         # Ugly - cert objects aren't parsed so we need to read the
819         # extension and hope there are no other basicConstraints
820         if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
821             logger.warn("verify_chain: cert {}'s parent {} is not a CA"
822                         .format(self.pretty_cert(), self.parent.pretty_cert()))
823             raise CertNotSignedByParent("{}: Parent {} not a CA"
824                                         .format(self.pretty_cert(), self.parent.pretty_cert()))
825
826         # if the parent isn't verified...
827         if debug_verify_chain:
828             logger.debug("verify_chain: .. {}, -> verifying parent {}"
829                          .format(self.pretty_cert(),self.parent.pretty_cert()))
830         self.parent.verify_chain(trusted_certs)
831
832         return
833
834     ### more introspection
835     def get_extensions(self):
836         import M2Crypto
837         # pyOpenSSL does not have a way to get extensions
838         triples = []
839         m2x509 = M2Crypto.X509.load_cert_string(self.save_to_string())
840         nb_extensions = m2x509.get_ext_count()
841         logger.debug("X509 had {} extensions".format(nb_extensions))
842         for i in range(nb_extensions):
843             ext = m2x509.get_ext_at(i)
844             triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) )
845         return triples
846
847     def get_data_names(self):
848         return self.data.keys()
849
850     def get_all_datas(self):
851         triples = self.get_extensions()
852         for name in self.get_data_names():
853             triples.append( (name,self.get_data(name),'data',) )
854         return triples
855
856     # only informative
857     def get_filename(self):
858         return getattr(self,'filename',None)
859
860     def dump(self, *args, **kwargs):
861         print(self.dump_string(*args, **kwargs))
862
863     def dump_string(self, show_extensions=False):
864         result = ""
865         result += "CERTIFICATE for {}\n".format(self.pretty_cert())
866         result += "Issued by {}\n".format(self.get_issuer())
867         filename = self.get_filename()
868         if filename:
869             result += "Filename {}\n".format(filename)
870         if show_extensions:
871             all_datas = self.get_all_datas()
872             result += " has {} extensions/data attached".format(len(all_datas))
873             for n, v, c in all_datas:
874                 if c == 'data':
875                     result += "   data: {}={}\n".format(n, v)
876                 else:
877                     result += "    ext: {} (crit={})=<<<{}>>>\n".format(n, c, v)
878         return result