d0d36d536ed235b87ff9236f11c5c8a1cc5da403
[sfa.git] / sfa / trust / certificate.py
1 # ----------------------------------------------------------------------
2 # Copyright (c) 2008 Board of Trustees, Princeton University
3 #
4 # Permission is hereby granted, free of charge, to any person obtaining
5 # a copy of this software and/or hardware specification (the "Work") to
6 # deal in the Work without restriction, including without limitation the
7 # rights to use, copy, modify, merge, publish, distribute, sublicense,
8 # and/or sell copies of the Work, and to permit persons to whom the Work
9 # is furnished to do so, subject to the following conditions:
10 #
11 # The above copyright notice and this permission notice shall be
12 # included in all copies or substantial portions of the Work.
13 #
14 # THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
21 # IN THE WORK.
22 # ----------------------------------------------------------------------
23
24 #
25 # SFA uses two crypto libraries: pyOpenSSL and M2Crypto to implement
26 # the necessary crypto functionality. Ideally just one of these libraries
27 # would be used, but unfortunately each of these libraries is independently
28 # lacking. The pyOpenSSL library is missing many necessary functions, and
29 # the M2Crypto library has crashed inside of some of the functions. The
30 # design decision is to use pyOpenSSL whenever possible as it seems more
31 # stable, and only use M2Crypto for those functions that are not possible
32 # in pyOpenSSL.
33 #
34 # This module exports two classes: Keypair and Certificate.
35 ##
36 #
37
38 # Notes on using the openssl command line
39 #
40 # for verifying the chain in a gid,
41 # assuming it is split into pieces p1.pem p2.pem p3.pem
42 # you can use openssl to verify the chain using this command
43 # openssl verify -verbose -CAfile <(cat p2.pem p3.pem) p1.pem
44 # also you can use sfax509 to invoke openssl x509 on all parts of the gid
45 #
46
47
48
49
50 import functools
51 import os
52 import tempfile
53 import base64
54 from tempfile import mkstemp
55
56 import OpenSSL
57 # M2Crypto is imported on the fly to minimize crashes
58 # import M2Crypto
59
60 from sfa.util.faults import (CertExpired, CertMissingParent,
61                              CertNotSignedByParent)
62 from sfa.util.sfalogging import logger
63
64 # this tends to generate quite some logs for little or no value
65 debug_verify_chain = True
66
67 glo_passphrase_callback = None
68
69 ##
70 # A global callback may be implemented for requesting passphrases from the
71 # user. The function will be called with three arguments:
72 #
73 #    keypair_obj: the keypair object that is calling the passphrase
74 #    string: the string containing the private key that's being loaded
75 #    x: unknown, appears to be 0, comes from pyOpenSSL and/or m2crypto
76 #
77 # The callback should return a string containing the passphrase.
78
79
80 def set_passphrase_callback(callback_func):
81     global glo_passphrase_callback
82
83     glo_passphrase_callback = callback_func
84
85 ##
86 # Sets a fixed passphrase.
87
88
89 def set_passphrase(passphrase):
90     set_passphrase_callback(lambda k, s, x: passphrase)
91
92 ##
93 # Check to see if a passphrase works for a particular private key string.
94 # Intended to be used by passphrase callbacks for input validation.
95
96
97 def test_passphrase(string, passphrase):
98     try:
99         OpenSSL.crypto.load_privatekey(
100             OpenSSL.crypto.FILETYPE_PEM, string, (lambda x: passphrase))
101         return True
102     except:
103         return False
104
105
106 def convert_public_key(key):
107     keyconvert_path = "/usr/bin/keyconvert.py"
108     if not os.path.isfile(keyconvert_path):
109         raise IOError(
110             "Could not find keyconvert in {}".format(keyconvert_path))
111
112     # we can only convert rsa keys
113     if "ssh-dss" in key:
114         raise Exception("keyconvert: dss keys are not supported")
115
116     (ssh_f, ssh_fn) = tempfile.mkstemp()
117     ssl_fn = tempfile.mktemp()
118     os.write(ssh_f, key.encode())
119     os.close(ssh_f)
120
121     cmd = keyconvert_path + " " + ssh_fn + " " + ssl_fn
122     os.system(cmd)
123
124     # this check leaves the temporary file containing the public key so
125     # that it can be expected to see why it failed.
126     # TODO: for production, cleanup the temporary files
127     if not os.path.exists(ssl_fn):
128         raise Exception(
129             "generated certificate not found. keyconvert may have failed.")
130
131     k = Keypair()
132     try:
133         k.load_pubkey_from_file(ssl_fn)
134         return k
135     except:
136         logger.log_exc("convert_public_key caught exception")
137         raise
138     finally:
139         # remove the temporary files
140         if os.path.exists(ssh_fn):
141             os.remove(ssh_fn)
142         if os.path.exists(ssl_fn):
143             os.remove(ssl_fn)
144
145 ##
146 # Public-private key pairs are implemented by the Keypair class.
147 # A Keypair object may represent both a public and private key pair, or it
148 # may represent only a public key (this usage is consistent with OpenSSL).
149
150
151 class Keypair:
152     key = None       # public/private keypair
153     m2key = None     # public key (m2crypto format)
154
155     ##
156     # Creates a Keypair object
157     # @param create If create==True, creates a new public/private key and
158     #     stores it in the object
159     # @param string If string != None, load the keypair from the string (PEM)
160     # @param filename If filename != None, load the keypair from the file
161
162     def __init__(self, create=False, string=None, filename=None):
163         if create:
164             self.create()
165         if string:
166             self.load_from_string(string)
167         if filename:
168             self.load_from_file(filename)
169
170     ##
171     # Create a RSA public/private key pair and store it inside the keypair
172     # object
173
174     def create(self):
175         self.key = OpenSSL.crypto.PKey()
176         self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
177
178     ##
179     # Save the private key to a file
180     # @param filename name of file to store the keypair in
181
182     def save_to_file(self, filename):
183         with open(filename, 'wb') as output:
184             output.write(self.as_pem())
185         self.filename = filename
186
187     ##
188     # Load the private key from a file. Implicity the private key includes the
189     # public key.
190
191     def load_from_file(self, filename):
192         logger.info(f"opening {filename} from certficate.load_from_file")
193         self.filename = filename
194         buffer = open(filename, 'r').read()
195         self.load_from_string(buffer)
196
197     ##
198     # Load the private key from a string. Implicitly the private key includes
199     # the public key.
200
201     def load_from_string(self, string):
202         import M2Crypto
203         if glo_passphrase_callback:
204             self.key = OpenSSL.crypto.load_privatekey(
205                 OpenSSL.crypto.FILETYPE_PEM, string,
206                 functools.partial(glo_passphrase_callback, self, string))
207             self.m2key = M2Crypto.EVP.load_key_string(
208                 string.encode(encoding="utf-8"),
209                 functools.partial(glo_passphrase_callback, self, string))
210         else:
211             self.key = OpenSSL.crypto.load_privatekey(
212                 OpenSSL.crypto.FILETYPE_PEM, string)
213             self.m2key = M2Crypto.EVP.load_key_string(
214                 string.encode(encoding="utf-8"))
215
216     ##
217     #  Load the public key from a string. No private key is loaded.
218
219     def load_pubkey_from_file(self, filename):
220         import M2Crypto
221         # load the m2 public key
222         m2rsakey = M2Crypto.RSA.load_pub_key(filename)
223         self.m2key = M2Crypto.EVP.PKey()
224         self.m2key.assign_rsa(m2rsakey)
225
226         # create an m2 x509 cert
227         m2name = M2Crypto.X509.X509_Name()
228         m2name.add_entry_by_txt(field="CN", type=0x1001,
229                                 entry="junk", len=-1, loc=-1, set=0)
230         m2x509 = M2Crypto.X509.X509()
231         m2x509.set_pubkey(self.m2key)
232         m2x509.set_serial_number(0)
233         m2x509.set_issuer_name(m2name)
234         m2x509.set_subject_name(m2name)
235         ASN1 = M2Crypto.ASN1.ASN1_UTCTIME()
236         ASN1.set_time(500)
237         m2x509.set_not_before(ASN1)
238         m2x509.set_not_after(ASN1)
239         # x509v3 so it can have extensions
240         # prob not necc since this cert itself is junk but still...
241         m2x509.set_version(2)
242         junk_key = Keypair(create=True)
243         m2x509.sign(pkey=junk_key.get_m2_pubkey(), md="sha1")
244
245         # convert the m2 x509 cert to a pyopenssl x509
246         m2pem = m2x509.as_pem()
247         pyx509 = OpenSSL.crypto.load_certificate(
248             OpenSSL.crypto.FILETYPE_PEM, m2pem)
249
250         # get the pyopenssl pkey from the pyopenssl x509
251         self.key = pyx509.get_pubkey()
252         self.filename = filename
253
254     ##
255     # Load the public key from a string. No private key is loaded.
256
257     def load_pubkey_from_string(self, string):
258         (f, fn) = tempfile.mkstemp()
259         os.write(f, string)
260         os.close(f)
261         self.load_pubkey_from_file(fn)
262         os.remove(fn)
263
264     ##
265     # Return the private key in PEM format.
266
267     def as_pem(self):
268         return OpenSSL.crypto.dump_privatekey(
269             OpenSSL.crypto.FILETYPE_PEM, self.key)
270
271     ##
272     # Return an M2Crypto key object
273
274     def get_m2_pubkey(self):
275         import M2Crypto
276         if not self.m2key:
277             self.m2key = M2Crypto.EVP.load_key_string(self.as_pem())
278         return self.m2key
279
280     ##
281     # Returns a string containing the public key represented by this object.
282
283     def get_pubkey_string(self):
284         m2pkey = self.get_m2_pubkey()
285         return base64.b64encode(m2pkey.as_der())
286
287     ##
288     # Return an OpenSSL pkey object
289
290     def get_openssl_pkey(self):
291         return self.key
292
293     ##
294     # Given another Keypair object, return TRUE if the two keys are the same.
295
296     def is_same(self, pkey):
297         return self.as_pem() == pkey.as_pem()
298
299     def sign_string(self, data):
300         k = self.get_m2_pubkey()
301         k.sign_init()
302         k.sign_update(data)
303         return base64.b64encode(k.sign_final())
304
305     def verify_string(self, data, sig):
306         import M2Crypto
307         k = self.get_m2_pubkey()
308         k.verify_init()
309         k.verify_update(data)
310         return M2Crypto.m2.verify_final(k.ctx, base64.b64decode(sig), k.pkey)
311
312     def compute_hash(self, value):
313         return self.sign_string(str(value))
314
315     # only informative
316     def get_filename(self):
317         return getattr(self, 'filename', None)
318
319     def dump(self, *args, **kwargs):
320         print(self.dump_string(*args, **kwargs))
321
322     def dump_string(self):
323         result = ""
324         result += "KEYPAIR: pubkey={:>40}...".format(self.get_pubkey_string())
325         filename = self.get_filename()
326         if filename:
327             result += "Filename {}\n".format(filename)
328         return result
329
330 ##
331 # The certificate class implements a general purpose X509 certificate, making
332 # use of the appropriate pyOpenSSL or M2Crypto abstractions. It also adds
333 # several addition features, such as the ability to maintain a chain of
334 # parent certificates, and storage of application-specific data.
335 #
336 # Certificates include the ability to maintain a chain of parents. Each
337 # certificate includes a pointer to it's parent certificate. When loaded
338 # from a file or a string, the parent chain will be automatically loaded.
339 # When saving a certificate to a file or a string, the caller can choose
340 # whether to save the parent certificates as well.
341
342
343 class Certificate:
344     digest = "sha256"
345
346 #    x509 = None
347 #    issuerKey = None
348 #    issuerSubject = None
349 #    parent = None
350     isCA = None  # will be a boolean once set
351
352     separator = "-----parent-----"
353
354     ##
355     # Create a certificate object.
356     #
357     # @param lifeDays life of cert in days - default is 1825==5 years
358     # @param create If create==True, then also create a blank X509 certificate.
359     # @param subject If subject!=None, then create a blank certificate and set
360     #     it's subject name.
361     # @param string If string!=None, load the certficate from the string.
362     # @param filename If filename!=None, load the certficiate from the file.
363     # @param isCA If !=None, set whether this cert is for a CA
364
365     def __init__(self, lifeDays=1825, create=False, subject=None, string=None,
366                  filename=None, isCA=None):
367         # these used to be defined in the class !
368         self.x509 = None
369         self.issuerKey = None
370         self.issuerSubject = None
371         self.parent = None
372
373         self.data = {}
374         if create or subject:
375             self.create(lifeDays)
376         if subject:
377             self.set_subject(subject)
378         if string:
379             self.load_from_string(string)
380         if filename:
381             self.load_from_file(filename)
382
383         # Set the CA bit if a value was supplied
384         if isCA is not None:
385             self.set_is_ca(isCA)
386
387     # Create a blank X509 certificate and store it in this object.
388
389     def create(self, lifeDays=1825):
390         self.x509 = OpenSSL.crypto.X509()
391         # FIXME: Use different serial #s
392         self.x509.set_serial_number(3)
393         self.x509.gmtime_adj_notBefore(0)  # 0 means now
394         self.x509.gmtime_adj_notAfter(
395             lifeDays * 60 * 60 * 24)  # five years is default
396         self.x509.set_version(2)  # x509v3 so it can have extensions
397
398     ##
399     # Given a pyOpenSSL X509 object, store that object inside of this
400     # certificate object.
401
402     def load_from_pyopenssl_x509(self, x509):
403         self.x509 = x509
404
405     ##
406     # Load the certificate from a string
407
408     def load_from_string(self, string):
409         # if it is a chain of multiple certs, then split off the first one and
410         # load it (support for the ---parent--- tag as well as normal chained
411         # certs)
412
413         if string is None or string.strip() == "":
414             logger.warning("Empty string in load_from_string")
415             return
416
417         string = string.strip()
418
419         # If it's not in proper PEM format, wrap it
420         if string.count('-----BEGIN CERTIFICATE') == 0:
421             string = '-----BEGIN CERTIFICATE-----'\
422                      '\n{}\n-----END CERTIFICATE-----'\
423                      .format(string)
424
425         # If there is a PEM cert in there, but there is some other text first
426         # such as the text of the certificate, skip the text
427         beg = string.find('-----BEGIN CERTIFICATE')
428         if beg > 0:
429             # skipping over non cert beginning
430             string = string[beg:]
431
432         parts = []
433
434         if string.count('-----BEGIN CERTIFICATE-----') > 1 and \
435                 string.count(Certificate.separator) == 0:
436             parts = string.split('-----END CERTIFICATE-----', 1)
437             parts[0] += '-----END CERTIFICATE-----'
438         else:
439             parts = string.split(Certificate.separator, 1)
440
441         self.x509 = OpenSSL.crypto.load_certificate(
442             OpenSSL.crypto.FILETYPE_PEM, parts[0])
443
444         if self.x509 is None:
445             logger.warning(
446                 "Loaded from string but cert is None: {}".format(string))
447
448         # if there are more certs, then create a parent and let the parent load
449         # itself from the remainder of the string
450         if len(parts) > 1 and parts[1] != '':
451             self.parent = self.__class__()
452             self.parent.load_from_string(parts[1])
453
454     ##
455     # Load the certificate from a file
456
457     def load_from_file(self, filename):
458         file = open(filename)
459         string = file.read()
460         self.load_from_string(string)
461         self.filename = filename
462
463     ##
464     # Save the certificate to a string.
465     #
466     # @param save_parents If save_parents==True,
467     # then also save the parent certificates.
468
469     def save_to_string(self, save_parents=True):
470         if self.x509 is None:
471             logger.warning("None cert in certificate.save_to_string")
472             return ""
473         string = OpenSSL.crypto.dump_certificate(
474             OpenSSL.crypto.FILETYPE_PEM, self.x509)
475         if isinstance(string, bytes):
476             string = string.decode()
477         if save_parents and self.parent:
478             string = string + self.parent.save_to_string(save_parents)
479         return string
480
481     ##
482     # Save the certificate to a file.
483     # @param save_parents If save_parents==True,
484     #  then also save the parent certificates.
485
486     def save_to_file(self, filename, save_parents=True, filep=None):
487         string = self.save_to_string(save_parents=save_parents)
488         if filep:
489             f = filep
490         else:
491             f = open(filename, 'w')
492         if isinstance(string, bytes):
493             string = string.decode()
494         f.write(string)
495         f.close()
496         self.filename = filename
497
498     ##
499     # Save the certificate to a random file in /tmp/
500     # @param save_parents If save_parents==True,
501     #  then also save the parent certificates.
502     def save_to_random_tmp_file(self, save_parents=True):
503         fp, filename = mkstemp(suffix='cert', text=True)
504         fp = os.fdopen(fp, "w")
505         self.save_to_file(filename, save_parents=True, filep=fp)
506         return filename
507
508     ##
509     # Sets the issuer private key and name
510     # @param key Keypair object containing the private key of the issuer
511     # @param subject String containing the name of the issuer
512     # @param cert (optional)
513     #  Certificate object containing the name of the issuer
514
515     def set_issuer(self, key, subject=None, cert=None):
516         self.issuerKey = key
517         if subject:
518             # it's a mistake to use subject and cert params at the same time
519             assert(not cert)
520             if isinstance(subject, dict) or isinstance(subject, str):
521                 req = OpenSSL.crypto.X509Req()
522                 reqSubject = req.get_subject()
523                 if isinstance(subject, dict):
524                     for key in list(reqSubject.keys()):
525                         setattr(reqSubject, key, subject[key])
526                 else:
527                     setattr(reqSubject, "CN", subject)
528                 subject = reqSubject
529                 # subject is not valid once req is out of scope, so save req
530                 self.issuerReq = req
531         if cert:
532             # if a cert was supplied, then get the subject from the cert
533             subject = cert.x509.get_subject()
534         assert(subject)
535         self.issuerSubject = subject
536
537     ##
538     # Get the issuer name
539
540     def get_issuer(self, which="CN"):
541         x = self.x509.get_issuer()
542         return getattr(x, which)
543
544     ##
545     # Set the subject name of the certificate
546
547     def set_subject(self, name):
548         req = OpenSSL.crypto.X509Req()
549         subj = req.get_subject()
550         if isinstance(name, dict):
551             for key in list(name.keys()):
552                 setattr(subj, key, name[key])
553         else:
554             setattr(subj, "CN", name)
555         self.x509.set_subject(subj)
556
557     ##
558     # Get the subject name of the certificate
559
560     def get_subject(self, which="CN"):
561         x = self.x509.get_subject()
562         return getattr(x, which)
563
564     ##
565     # Get a pretty-print subject name of the certificate
566     # let's try to make this a little more usable as is makes logs hairy
567     # FIXME: Consider adding 'urn:publicid' and 'uuid' back for GENI?
568     pretty_fields = ['email']
569
570     def filter_chunk(self, chunk):
571         for field in self.pretty_fields:
572             if field in chunk:
573                 return " " + chunk
574
575     def pretty_cert(self):
576         message = "[Cert."
577         x = self.x509.get_subject()
578         ou = getattr(x, "OU")
579         if ou:
580             message += " OU: {}".format(ou)
581         cn = getattr(x, "CN")
582         if cn:
583             message += " CN: {}".format(cn)
584         data = self.get_data(field='subjectAltName')
585         if data:
586             message += " SubjectAltName:"
587             filtered = [self.filter_chunk(chunk) for chunk in data.split()]
588             message += " ".join([f for f in filtered if f])
589             omitted = len([f for f in filtered if not f])
590             if omitted:
591                 message += "..+{} omitted".format(omitted)
592         message += "]"
593         return message
594
595     def pretty_chain(self):
596         message = "{}".format(self.x509.get_subject())
597         parent = self.parent
598         while parent:
599             message += "->{}".format(parent.x509.get_subject())
600             parent = parent.parent
601         return message
602
603     def pretty_name(self):
604         return self.get_filename() or self.pretty_chain()
605
606     ##
607     # Get the public key of the certificate.
608     #
609     # @param key Keypair object containing the public key
610
611     def set_pubkey(self, key):
612         assert(isinstance(key, Keypair))
613         self.x509.set_pubkey(key.get_openssl_pkey())
614
615     ##
616     # Get the public key of the certificate.
617     # It is returned in the form of a Keypair object.
618
619     def get_pubkey(self):
620         import M2Crypto
621         m2x509 = M2Crypto.X509.load_cert_string(self.save_to_string())
622         pkey = Keypair()
623         pkey.key = self.x509.get_pubkey()
624         pkey.m2key = m2x509.get_pubkey()
625         return pkey
626
627     def set_intermediate_ca(self, val):
628         return self.set_is_ca(val)
629
630     # Set whether this cert is for a CA.
631     # All signers and only signers should be CAs.
632     # The local member starts unset, letting us check that you only set it once
633     # @param val Boolean indicating whether this cert is for a CA
634     def set_is_ca(self, val):
635         if val is None:
636             return
637
638         if self.isCA is not None:
639             # Can't double set properties
640             raise Exception(
641                 "Cannot set basicConstraints CA:?? more than once. "
642                 "Was {}, trying to set as {}".format(self.isCA, val))
643
644         self.isCA = val
645         if val:
646             self.add_extension('basicConstraints', 1, 'CA:TRUE')
647         else:
648             self.add_extension('basicConstraints', 1, 'CA:FALSE')
649
650     ##
651     # Add an X509 extension to the certificate. Add_extension can only
652     # be called once for a particular extension name, due to
653     # limitations in the underlying library.
654     #
655     # @param name string containing name of extension
656     # @param value string containing value of the extension
657
658     def add_extension(self, name, critical, value):
659         oldExtVal = None
660         try:
661             oldExtVal = self.get_extension(name)
662         except:
663             # M2Crypto LookupError when the extension isn't there (yet)
664             pass
665
666         # This code limits you from adding the extension with the same value
667         # The method comment says you shouldn't do this with the same name
668         # But actually it (m2crypto) appears to allow you to do this.
669         if oldExtVal and oldExtVal == value:
670             # don't add this extension again
671             # just do nothing as here
672             return
673         # FIXME: What if they are trying to set with a different value?
674         # Is this ever OK? Or should we raise an exception?
675 #        elif oldExtVal:
676 #            raise "Cannot add extension {} which had val {} with new val {}"\
677 #                  .format(name, oldExtVal, value)
678
679         if isinstance(name, str):
680             name = name.encode()
681         if isinstance(value, str):
682             value = value.encode()
683
684         ext = OpenSSL.crypto.X509Extension(name, critical, value)
685         self.x509.add_extensions([ext])
686
687     ##
688     # Get an X509 extension from the certificate
689
690     def get_extension(self, name):
691
692         import M2Crypto
693         if name is None:
694             return None
695
696         certstr = self.save_to_string()
697         if certstr is None or certstr == "":
698             return None
699         # pyOpenSSL does not have a way to get extensions
700         m2x509 = M2Crypto.X509.load_cert_string(certstr)
701         if m2x509 is None:
702             logger.warning("No cert loaded in get_extension")
703             return None
704         if m2x509.get_ext(name) is None:
705             return None
706         value = m2x509.get_ext(name).get_value()
707
708         return value
709
710     ##
711     # Set_data is a wrapper around add_extension. It stores the
712     # parameter str in the X509 subject_alt_name extension. Set_data
713     # can only be called once, due to limitations in the underlying
714     # library.
715
716     def set_data(self, string, field='subjectAltName'):
717         # pyOpenSSL only allows us to add extensions, so if we try to set the
718         # same extension more than once, it will not work
719         if field in self.data:
720             raise Exception("Cannot set {} more than once".format(field))
721         self.data[field] = string
722         # call str() because we've seen unicode there
723         # and the underlying C code doesn't like it
724         self.add_extension(field, 0, str(string))
725
726     ##
727     # Return the data string that was previously set with set_data
728
729     def get_data(self, field='subjectAltName'):
730         if field in self.data:
731             return self.data[field]
732
733         try:
734             uri = self.get_extension(field)
735             self.data[field] = uri
736         except LookupError:
737             return None
738
739         return self.data[field]
740
741     ##
742     # Sign the certificate using the issuer private key and issuer subject
743     # previous set with set_issuer().
744
745     def sign(self):
746         logger.debug('certificate.sign')
747         assert self.x509 is not None
748         assert self.issuerSubject is not None
749         assert self.issuerKey is not None
750         self.x509.set_issuer(self.issuerSubject)
751         self.x509.sign(self.issuerKey.get_openssl_pkey(), self.digest)
752
753     ##
754     # Verify the authenticity of a certificate.
755     # @param pkey is a Keypair object representing a public key. If Pkey
756     #     did not sign the certificate, then an exception will be thrown.
757
758     def verify(self, pubkey):
759         import M2Crypto
760         # pyOpenSSL does not have a way to verify signatures
761         m2x509 = M2Crypto.X509.load_cert_string(self.save_to_string())
762         m2pubkey = pubkey.get_m2_pubkey()
763         # verify it
764         # https://www.openssl.org/docs/man1.1.0/crypto/X509_verify.html
765         # verify returns
766         # 1  if it checks out
767         # 0  if if does not
768         # -1 if it could not be checked 'for some reason'
769         m2result = m2x509.verify(m2pubkey)
770         result = m2result == 1
771         if debug_verify_chain:
772             logger.debug("Certificate.verify: <- {} (m2={}) ({} x {})"
773                          .format(result, m2result,
774                                  self.pretty_cert(), m2pubkey))
775         return result
776
777         # XXX alternatively, if openssl has been patched, do the much simpler:
778         # try:
779         #   self.x509.verify(pkey.get_openssl_key())
780         #   return 1
781         # except:
782         #   return 0
783
784     ##
785     # Return True if pkey is identical to the public key that is
786     # contained in the certificate.
787     # @param pkey Keypair object
788
789     def is_pubkey(self, pkey):
790         return self.get_pubkey().is_same(pkey)
791
792     ##
793     # Given a certificate cert, verify that this certificate was signed by the
794     # public key contained in cert. Throw an exception otherwise.
795     #
796     # @param cert certificate object
797
798     def is_signed_by_cert(self, cert):
799         key = cert.get_pubkey()
800         logger.debug("Certificate.is_signed_by_cert -> verify on {}\n"
801                      "with pubkey {}"
802                      .format(self, key))
803         result = self.verify(key)
804         return result
805
806     ##
807     # Set the parent certficiate.
808     #
809     # @param p certificate object.
810
811     def set_parent(self, p):
812         self.parent = p
813
814     ##
815     # Return the certificate object of the parent of this certificate.
816
817     def get_parent(self):
818         return self.parent
819
820     ##
821     # Verification examines a chain of certificates to ensure that each parent
822     # signs the child, and that some certificate in the chain is signed by a
823     # trusted certificate.
824     #
825     # Verification is a basic recursion: <pre>
826     #     if this_certificate was signed by trusted_certs:
827     #         return
828     #     else
829     #         return verify_chain(parent, trusted_certs)
830     # </pre>
831     #
832     # At each recursion, the parent is tested to ensure that it did sign the
833     # child. If a parent did not sign a child, then an exception is thrown. If
834     # the bottom of the recursion is reached and the certificate does not match
835     # a trusted root, then an exception is thrown.
836     # Also require that parents are CAs.
837     #
838     # @param Trusted_certs is a list of certificates that are trusted.
839     #
840
841     def verify_chain(self, trusted_certs=None):
842         # Verify a chain of certificates. Each certificate must be signed by
843         # the public key contained in it's parent. The chain is recursed
844         # until a certificate is found that is signed by a trusted root.
845
846         # verify expiration time
847         if self.x509.has_expired():
848             if debug_verify_chain:
849                 logger.debug("verify_chain: NO, Certificate {} has expired"
850                              .format(self.pretty_cert()))
851             raise CertExpired(self.pretty_cert(), "client cert")
852
853         # if this cert is signed by a trusted_cert, then we are set
854         for i, trusted_cert in enumerate(trusted_certs, 1):
855             logger.debug(5*'-' +
856                          " Certificate.verify_chain - trying trusted #{} : {}"
857                          .format(i, trusted_cert.pretty_name()))
858             if self.is_signed_by_cert(trusted_cert):
859                 # verify expiration of trusted_cert ?
860                 if not trusted_cert.x509.has_expired():
861                     if debug_verify_chain:
862                         logger.debug("verify_chain: YES."
863                                      " Cert {} signed by trusted cert {}"
864                                      .format(self.pretty_name(),
865                                              trusted_cert.pretty_name()))
866                     return trusted_cert
867                 else:
868                     if debug_verify_chain:
869                         logger.debug("verify_chain: NO. Cert {} "
870                                      "is signed by trusted_cert {}, "
871                                      "but that signer is expired..."
872                                      .format(self.pretty_cert(),
873                                              trusted_cert.pretty_cert()))
874                     raise CertExpired("{} signer trusted_cert {}"
875                                       .format(self.pretty_name(),
876                                               trusted_cert.pretty_name()))
877             else:
878                 logger.debug("verify_chain: not a direct"
879                              " descendant of trusted root #{}".format(i))
880
881         # if there is no parent, then no way to verify the chain
882         if not self.parent:
883             if debug_verify_chain:
884                 logger.debug("verify_chain: NO. {} has no parent "
885                              "and issuer {} is not in {} trusted roots"
886                              .format(self.pretty_name(), self.get_issuer(),
887                                      len(trusted_certs)))
888             raise CertMissingParent("{}: Issuer {} is not "
889                                     "one of the {} trusted roots, "
890                                     "and cert has no parent."
891                                     .format(self.pretty_name(),
892                                             self.get_issuer(),
893                                             len(trusted_certs)))
894
895         # if it wasn't signed by the parent...
896         if not self.is_signed_by_cert(self.parent):
897             if debug_verify_chain:
898                 logger.debug("verify_chain: NO. {} is not signed by parent {}"
899                              .format(self.pretty_name(),
900                                      self.parent.pretty_name()))
901                 self.save_to_file("/tmp/xxx-capture.pem", save_parents=True)
902             raise CertNotSignedByParent("{}: Parent {}, issuer {}"
903                                         .format(self.pretty_name(),
904                                                 self.parent.pretty_name(),
905                                                 self.get_issuer()))
906
907         # Confirm that the parent is a CA. Only CAs can be trusted as
908         # signers.
909         # Note that trusted roots are not parents, so don't need to be
910         # CAs.
911         # Ugly - cert objects aren't parsed so we need to read the
912         # extension and hope there are no other basicConstraints
913         if not self.parent.isCA and not (
914                 self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
915             logger.warning("verify_chain: cert {}'s parent {} is not a CA"
916                             .format(self.pretty_name(), self.parent.pretty_name()))
917             raise CertNotSignedByParent("{}: Parent {} not a CA"
918                                         .format(self.pretty_name(),
919                                                 self.parent.pretty_name()))
920
921         # if the parent isn't verified...
922         if debug_verify_chain:
923             logger.debug("verify_chain: .. {}, -> verifying parent {}"
924                          .format(self.pretty_name(),
925                                  self.parent.pretty_name()))
926         self.parent.verify_chain(trusted_certs)
927
928         return
929
930     # more introspection
931     def get_extensions(self):
932         import M2Crypto
933         # pyOpenSSL does not have a way to get extensions
934         triples = []
935         m2x509 = M2Crypto.X509.load_cert_string(self.save_to_string())
936         nb_extensions = m2x509.get_ext_count()
937         logger.debug("X509 had {} extensions".format(nb_extensions))
938         for i in range(nb_extensions):
939             ext = m2x509.get_ext_at(i)
940             triples.append(
941                 (ext.get_name(), ext.get_value(), ext.get_critical(),))
942         return triples
943
944     def get_data_names(self):
945         return list(self.data.keys())
946
947     def get_all_datas(self):
948         triples = self.get_extensions()
949         for name in self.get_data_names():
950             triples.append((name, self.get_data(name), 'data',))
951         return triples
952
953     # only informative
954     def get_filename(self):
955         return getattr(self, 'filename', None)
956
957     def dump(self, *args, **kwargs):
958         print(self.dump_string(*args, **kwargs))
959
960     def dump_string(self, show_extensions=False):
961         result = ""
962         result += "CERTIFICATE for {}\n".format(self.pretty_cert())
963         result += "Issued by {}\n".format(self.get_issuer())
964         filename = self.get_filename()
965         if filename:
966             result += "Filename {}\n".format(filename)
967         if show_extensions:
968             all_datas = self.get_all_datas()
969             result += " has {} extensions/data attached".format(len(all_datas))
970             for n, v, c in all_datas:
971                 if c == 'data':
972                     result += "   data: {}={}\n".format(n, v)
973                 else:
974                     result += "    ext: {} (crit={})=<<<{}>>>\n".format(n, c, v)
975         return result