3 # a general purpose class for dealing with certificates
5 # this class serves as an interface between a lower-level X.509 certificate
6 # library such as pyOpenSSL or M2Crypto. Currently both of these libraries
7 # are being used due to lack of functionality in pyOpenSSL and some apparant
10 from OpenSSL import crypto
12 from M2Crypto import X509
13 from M2Crypto import EVP
17 # represents a private/public key pair, or a public key
20 key = None # public/private keypair
21 m2key = None # public key (m2crypto format)
23 def __init__(self, create=False, string=None, filename=None):
27 self.load_from_string(string)
29 self.load_from_file(filename)
32 self.key = crypto.PKey()
33 self.key.generate_key(crypto.TYPE_RSA, 1024)
35 def save_to_file(self, filename):
36 open(filename, 'w').write(self.as_pem())
38 def load_from_file(self, filename):
39 buffer = open(filename, 'r').read()
40 self.load_from_string(buffer)
42 def load_from_string(self, string):
43 self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string)
44 self.m2key = M2Crypto.EVP.load_key_string(string)
47 return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.key)
49 def get_m2_pkey(self):
51 self.m2key = M2Crypto.EVP.load_key_string(self.as_pem())
54 def get_openssl_pkey(self):
57 def is_same(self, pkey):
58 return self.as_pem() == pkey.as_pem()
62 # Represents an X.509 certificate. Support is included for a list of
63 # certificates by use of a "parent" member. See load_from_string() and
64 # save_to_string() for insight into how a recursive chain of certs is
67 # Certificates support an application-defined "data" field, which is
68 # stored in the subjectAltName field of the X.509 certificate.
79 def __init__(self, create=False, subject=None, string=None, filename=None):
83 self.set_subject(subject)
85 self.load_from_string(string)
87 self.load_from_file(filename)
90 self.cert = crypto.X509()
91 self.cert.set_serial_number(1)
92 self.cert.gmtime_adj_notBefore(0)
93 self.cert.gmtime_adj_notAfter(60*60*24*365*5) # five years
95 def load_from_pyopenssl_x509(self, x509):
98 def load_from_string(self, string):
99 # if it is a chain of multiple certs, then split off the first one and
101 parts = string.split("-----parent-----", 1)
102 self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, parts[0])
104 # if there are more certs, then create a parent and let the parent load
105 # itself from the remainder of the string
107 self.parent = Certificate()
108 self.parent.load_from_string(parts[1])
111 def load_from_file(self, filename):
112 file = open(filename)
114 self.load_from_string(string)
116 def save_to_string(self, save_parents=False):
117 string = crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert)
118 if save_parents and self.parent:
119 string = string + "-----parent-----" + self.parent.save_to_string(save_parents)
122 def save_to_file(self, filename, save_parents=False):
123 string = self.save_to_string(save_parents=save_parents)
124 open(filename, 'w').write(string)
126 def set_issuer(self, key, subject=None, cert=None):
129 # it's a mistake to use subject and cert params at the same time
131 if isinstance(subject, dict) or isinstance(subject, str):
132 req = crypto.X509Req()
133 reqSubject = req.get_subject()
134 if (isinstance(subject, dict)):
135 for key in reqSubject.keys():
136 setattr(reqSubject, key, name[key])
138 setattr(reqSubject, "CN", subject)
140 # subject is not valid once req is out of scope, so save req
143 # if a cert was supplied, then get the subject from the cert
144 subject = cert.cert.get_issuer()
146 self.issuerSubject = subject
148 def get_issuer(self, which="CN"):
149 x = self.cert.get_issuer()
150 return getattr(x, which)
152 def set_subject(self, name):
153 req = crypto.X509Req()
154 subj = req.get_subject()
155 if (isinstance(name, dict)):
156 for key in name.keys():
157 setattr(subj, key, name[key])
159 setattr(subj, "CN", name)
160 self.cert.set_subject(subj)
162 def get_subject(self, which="CN"):
163 x = self.cert.get_subject()
164 return getattr(x, which)
166 def set_pubkey(self, key):
167 assert(isinstance(key, Keypair))
168 self.cert.set_pubkey(key.get_openssl_pkey())
170 def get_pubkey(self):
171 m2x509 = X509.load_cert_string(self.save_to_string())
173 pkey.key = self.cert.get_pubkey()
174 pkey.m2key = m2x509.get_pubkey()
177 def add_extension(self, name, critical, value):
178 ext = crypto.X509Extension (name, critical, value)
179 self.cert.add_extensions([ext])
181 def get_extension(self, name):
182 # pyOpenSSL does not have a way to get extensions
183 m2x509 = X509.load_cert_string(self.save_to_string())
184 value = m2x509.get_ext(name).get_value()
187 def set_data(self, str):
188 # pyOpenSSL only allows us to add extensions, so if we try to set the
189 # same extension more than once, it will not work
190 if self.data != None:
191 raise "cannot set subjectAltName more than once"
193 self.add_extension("subjectAltName", 0, "URI:http://" + str)
200 uri = self.get_extension("subjectAltName")
205 if not uri.startswith("URI:http://"):
206 raise "bad encoding in subjectAltName"
211 assert self.cert != None
212 assert self.issuerSubject != None
213 assert self.issuerKey != None
214 self.cert.set_issuer(self.issuerSubject)
215 self.cert.sign(self.issuerKey.get_openssl_pkey(), self.digest)
217 def verify(self, pkey):
218 # pyOpenSSL does not have a way to verify signatures
219 m2x509 = X509.load_cert_string(self.save_to_string())
220 m2pkey = pkey.get_m2_pkey()
222 return m2x509.verify(m2pkey)
224 # XXX alternatively, if openssl has been patched, do the much simpler:
226 # self.cert.verify(pkey.get_openssl_key())
231 def is_pubkey(self, pkey):
232 return self.get_pubkey().is_same(pkey)
234 def is_signed_by_cert(self, cert):
235 k = cert.get_pubkey()
236 result = self.verify(k)
239 def set_parent(self, p):
242 def get_parent(self):
245 def verify_chain(self, trusted_certs = None):
246 # Verify a chain of certificates. Each certificate must be signed by
247 # the public key contained in it's parent. The chain is recursed
248 # until a certificate is found that is signed by a trusted root.
250 # TODO: verify expiration time
252 # if this cert is signed by a trusted_cert, then we are set
253 for trusted_cert in trusted_certs:
254 # TODO: verify expiration of trusted_cert ?
255 if self.is_signed_by_cert(trusted_cert):
256 #print self.get_subject(), "is signed by a root"
259 # if there is no parent, then no way to verify the chain
261 #print self.get_subject(), "has no parent"
264 # if it wasn't signed by the parent...
265 if not self.is_signed_by_cert(self.parent):
266 #print self.get_subject(), "is not signed by parent"
269 # if the parent isn't verified...
270 if not self.parent.verify_chain(trusted_certs):
271 #print self.get_subject(), "parent does not verify"