added some comments
[sfa.git] / util / cert.py
1 # cert.py
2 #
3 # a general purpose class for dealing with certificates
4 #
5 # this class serves as an interface between a lower-level X.509 certificate
6 # library such as pyOpenSSL or M2Crypto. Currently both of these libraries
7 # are being used due to lack of functionality in pyOpenSSL and some apparant
8 # bugs in M2Crypto
9
10 from OpenSSL import crypto
11 import M2Crypto
12 from M2Crypto import X509
13 from M2Crypto import EVP
14
15 # Keypair
16 #
17 # represents a private/public key pair, or a public key
18
19 class Keypair:
20    key = None       # public/private keypair
21    m2key = None     # public key (m2crypto format)
22
23    def __init__(self, create=False, string=None, filename=None):
24       if create:
25          self.create()
26       if string:
27          self.load_from_string(string)
28       if filename:
29          self.load_from_file(filename)
30
31    def create(self):
32       self.key = crypto.PKey()
33       self.key.generate_key(crypto.TYPE_RSA, 1024)
34
35    def save_to_file(self, filename):
36       open(filename, 'w').write(self.as_pem())
37
38    def load_from_file(self, filename):
39       buffer = open(filename, 'r').read()
40       self.load_from_string(buffer)
41
42    def load_from_string(self, string):
43       self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string)
44       self.m2key = M2Crypto.EVP.load_key_string(string)
45
46    def as_pem(self):
47       return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.key)
48
49    def get_m2_pkey(self):
50       if not self.m2key:
51          self.m2key = M2Crypto.EVP.load_key_string(self.as_pem())
52       return self.m2key
53
54    def get_openssl_pkey(self):
55       return self.key
56
57    def is_same(self, pkey):
58       return self.as_pem() == pkey.as_pem()
59
60 # Certificate
61 #
62 # Represents an X.509 certificate. Support is included for a list of
63 # certificates by use of a "parent" member. See load_from_string() and
64 # save_to_string() for insight into how a recursive chain of certs is
65 # serialized.
66 #
67 # Certificates support an application-defined "data" field, which is
68 # stored in the subjectAltName field of the X.509 certificate.
69
70 class Certificate:
71    digest = "md5"
72
73    data = None
74    cert = None
75    issuerKey = None
76    issuerSubject = None
77    parent = None
78
79    def __init__(self, create=False, subject=None, string=None, filename=None):
80        if create or subject:
81            self.create()
82        if subject:
83            self.set_subject(subject)
84        if string:
85            self.load_from_string(string)
86        if filename:
87            self.load_from_file(filename)
88
89    def create(self):
90        self.cert = crypto.X509()
91        self.cert.set_serial_number(1)
92        self.cert.gmtime_adj_notBefore(0)
93        self.cert.gmtime_adj_notAfter(60*60*24*365*5) # five years
94
95    def load_from_pyopenssl_x509(self, x509):
96        self.cert = x509
97
98    def load_from_string(self, string):
99        # if it is a chain of multiple certs, then split off the first one and
100        # load it
101        parts = string.split("-----parent-----", 1)
102        self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, parts[0])
103
104        # if there are more certs, then create a parent and let the parent load
105        # itself from the remainder of the string
106        if len(parts) > 1:
107            self.parent = Certificate()
108            self.parent.load_from_string(parts[1])
109
110
111    def load_from_file(self, filename):
112        file = open(filename)
113        string = file.read()
114        self.load_from_string(string)
115
116    def save_to_string(self, save_parents=False):
117        string = crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert)
118        if save_parents and self.parent:
119           string = string + "-----parent-----" + self.parent.save_to_string(save_parents)
120        return string
121
122    def save_to_file(self, filename, save_parents=False):
123        string = self.save_to_string(save_parents=save_parents)
124        open(filename, 'w').write(string)
125
126    def set_issuer(self, key, subject=None, cert=None):
127        self.issuerKey = key
128        if subject:
129           # it's a mistake to use subject and cert params at the same time
130           assert(not cert)
131           if isinstance(subject, dict) or isinstance(subject, str):
132              req = crypto.X509Req()
133              reqSubject = req.get_subject()
134              if (isinstance(subject, dict)):
135                 for key in reqSubject.keys():
136                     setattr(reqSubject, key, name[key])
137              else:
138                 setattr(reqSubject, "CN", subject)
139              subject = reqSubject
140              # subject is not valid once req is out of scope, so save req
141              self.issuerReq = req
142        if cert:
143           # if a cert was supplied, then get the subject from the cert
144           subject = cert.cert.get_issuer()
145        assert(subject)
146        self.issuerSubject = subject
147
148    def get_issuer(self, which="CN"):
149        x = self.cert.get_issuer()
150        return getattr(x, which)
151
152    def set_subject(self, name):
153        req = crypto.X509Req()
154        subj = req.get_subject()
155        if (isinstance(name, dict)):
156            for key in name.keys():
157                setattr(subj, key, name[key])
158        else:
159            setattr(subj, "CN", name)
160        self.cert.set_subject(subj)
161
162    def get_subject(self, which="CN"):
163        x = self.cert.get_subject()
164        return getattr(x, which)
165
166    def set_pubkey(self, key):
167        assert(isinstance(key, Keypair))
168        self.cert.set_pubkey(key.get_openssl_pkey())
169
170    def get_pubkey(self):
171        m2x509 = X509.load_cert_string(self.save_to_string())
172        pkey = Keypair()
173        pkey.key = self.cert.get_pubkey()
174        pkey.m2key = m2x509.get_pubkey()
175        return pkey
176
177    def add_extension(self, name, critical, value):
178        ext = crypto.X509Extension (name, critical, value)
179        self.cert.add_extensions([ext])
180
181    def get_extension(self, name):
182        # pyOpenSSL does not have a way to get extensions
183        m2x509 = X509.load_cert_string(self.save_to_string())
184        value = m2x509.get_ext(name).get_value()
185        return value
186
187    def set_data(self, str):
188        # pyOpenSSL only allows us to add extensions, so if we try to set the
189        # same extension more than once, it will not work
190        if self.data != None:
191           raise "cannot set subjectAltName more than once"
192        self.data = str
193        self.add_extension("subjectAltName", 0, "URI:http://" + str)
194
195    def get_data(self):
196        if self.data:
197            return self.data
198
199        try:
200            uri = self.get_extension("subjectAltName")
201        except LookupError:
202            self.data = None
203            return self.data
204
205        if not uri.startswith("URI:http://"):
206            raise "bad encoding in subjectAltName"
207        self.data = uri[11:]
208        return self.data
209
210    def sign(self):
211        assert self.cert != None
212        assert self.issuerSubject != None
213        assert self.issuerKey != None
214        self.cert.set_issuer(self.issuerSubject)
215        self.cert.sign(self.issuerKey.get_openssl_pkey(), self.digest)
216
217    def verify(self, pkey):
218        # pyOpenSSL does not have a way to verify signatures
219        m2x509 = X509.load_cert_string(self.save_to_string())
220        m2pkey = pkey.get_m2_pkey()
221        # verify it
222        return m2x509.verify(m2pkey)
223
224        # XXX alternatively, if openssl has been patched, do the much simpler:
225        # try:
226        #   self.cert.verify(pkey.get_openssl_key())
227        #   return 1
228        # except:
229        #   return 0
230
231    def is_pubkey(self, pkey):
232        return self.get_pubkey().is_same(pkey)
233
234    def is_signed_by_cert(self, cert):
235        k = cert.get_pubkey()
236        result = self.verify(k)
237        return result
238
239    def set_parent(self, p):
240         self.parent = p
241
242    def get_parent(self):
243         return self.parent
244
245    def verify_chain(self, trusted_certs = None):
246         # Verify a chain of certificates. Each certificate must be signed by
247         # the public key contained in it's parent. The chain is recursed
248         # until a certificate is found that is signed by a trusted root.
249
250         # TODO: verify expiration time
251
252         # if this cert is signed by a trusted_cert, then we are set
253         for trusted_cert in trusted_certs:
254             # TODO: verify expiration of trusted_cert ?
255             if self.is_signed_by_cert(trusted_cert):
256                 #print self.get_subject(), "is signed by a root"
257                 return True
258
259         # if there is no parent, then no way to verify the chain
260         if not self.parent:
261             #print self.get_subject(), "has no parent"
262             return False
263
264         # if it wasn't signed by the parent...
265         if not self.is_signed_by_cert(self.parent):
266             #print self.get_subject(), "is not signed by parent"
267             return False
268
269         # if the parent isn't verified...
270         if not self.parent.verify_chain(trusted_certs):
271             #print self.get_subject(), "parent does not verify"
272             return False
273
274         return True