35435cd78e9bef306e7a360c20f4d0aef3a4d2f1
[sfa.git] / sfa / trust / certificate.py
1 #----------------------------------------------------------------------\r
2 # Copyright (c) 2008 Board of Trustees, Princeton University\r
3 #\r
4 # Permission is hereby granted, free of charge, to any person obtaining\r
5 # a copy of this software and/or hardware specification (the "Work") to\r
6 # deal in the Work without restriction, including without limitation the\r
7 # rights to use, copy, modify, merge, publish, distribute, sublicense,\r
8 # and/or sell copies of the Work, and to permit persons to whom the Work\r
9 # is furnished to do so, subject to the following conditions:\r
10 #\r
11 # The above copyright notice and this permission notice shall be\r
12 # included in all copies or substantial portions of the Work.\r
13 #\r
14 # THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS \r
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF \r
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND \r
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT \r
18 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, \r
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, \r
20 # OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS \r
21 # IN THE WORK.\r
22 #----------------------------------------------------------------------\r
23 \r
24 ##\r
25 # SFA uses two crypto libraries: pyOpenSSL and M2Crypto to implement\r
26 # the necessary crypto functionality. Ideally just one of these libraries\r
27 # would be used, but unfortunately each of these libraries is independently\r
28 # lacking. The pyOpenSSL library is missing many necessary functions, and\r
29 # the M2Crypto library has crashed inside of some of the functions. The\r
30 # design decision is to use pyOpenSSL whenever possible as it seems more\r
31 # stable, and only use M2Crypto for those functions that are not possible\r
32 # in pyOpenSSL.\r
33 #\r
34 # This module exports two classes: Keypair and Certificate.\r
35 ##\r
36 #\r
37 \r
38 import functools\r
39 import os\r
40 import tempfile\r
41 import base64\r
42 from tempfile import mkstemp\r
43 \r
44 from OpenSSL import crypto\r
45 import M2Crypto\r
46 from M2Crypto import X509\r
47 \r
48 from sfa.util.faults import CertExpired, CertMissingParent, CertNotSignedByParent\r
49 from sfa.util.sfalogging import logger\r
50 \r
51 glo_passphrase_callback = None\r
52 \r
53 ##\r
54 # A global callback may be implemented for requesting passphrases from the\r
55 # user. The function will be called with three arguments:\r
56 #\r
57 #    keypair_obj: the keypair object that is calling the passphrase\r
58 #    string: the string containing the private key that's being loaded\r
59 #    x: unknown, appears to be 0, comes from pyOpenSSL and/or m2crypto\r
60 #\r
61 # The callback should return a string containing the passphrase.\r
62 \r
63 def set_passphrase_callback(callback_func):\r
64     global glo_passphrase_callback\r
65 \r
66     glo_passphrase_callback = callback_func\r
67 \r
68 ##\r
69 # Sets a fixed passphrase.\r
70 \r
71 def set_passphrase(passphrase):\r
72     set_passphrase_callback( lambda k,s,x: passphrase )\r
73 \r
74 ##\r
75 # Check to see if a passphrase works for a particular private key string.\r
76 # Intended to be used by passphrase callbacks for input validation.\r
77 \r
78 def test_passphrase(string, passphrase):\r
79     try:\r
80         crypto.load_privatekey(crypto.FILETYPE_PEM, string, (lambda x: passphrase))\r
81         return True\r
82     except:\r
83         return False\r
84 \r
85 def convert_public_key(key):\r
86     keyconvert_path = "/usr/bin/keyconvert.py"\r
87     if not os.path.isfile(keyconvert_path):\r
88         raise IOError, "Could not find keyconvert in %s" % keyconvert_path\r
89 \r
90     # we can only convert rsa keys\r
91     if "ssh-dss" in key:\r
92         raise Exception, "keyconvert: dss keys are not supported"\r
93 \r
94     (ssh_f, ssh_fn) = tempfile.mkstemp()\r
95     ssl_fn = tempfile.mktemp()\r
96     os.write(ssh_f, key)\r
97     os.close(ssh_f)\r
98 \r
99     cmd = keyconvert_path + " " + ssh_fn + " " + ssl_fn\r
100     os.system(cmd)\r
101 \r
102     # this check leaves the temporary file containing the public key so\r
103     # that it can be expected to see why it failed.\r
104     # TODO: for production, cleanup the temporary files\r
105     if not os.path.exists(ssl_fn):\r
106         raise Exception, "keyconvert: generated certificate not found. keyconvert may have failed."\r
107 \r
108     k = Keypair()\r
109     try:\r
110         k.load_pubkey_from_file(ssl_fn)\r
111         return k\r
112     except:\r
113         logger.log_exc("convert_public_key caught exception")\r
114         raise\r
115     finally:\r
116         # remove the temporary files\r
117         if os.path.exists(ssh_fn):\r
118             os.remove(ssh_fn)\r
119         if os.path.exists(ssl_fn):\r
120             os.remove(ssl_fn)\r
121 \r
122 ##\r
123 # Public-private key pairs are implemented by the Keypair class.\r
124 # A Keypair object may represent both a public and private key pair, or it\r
125 # may represent only a public key (this usage is consistent with OpenSSL).\r
126 \r
127 class Keypair:\r
128     key = None       # public/private keypair\r
129     m2key = None     # public key (m2crypto format)\r
130 \r
131     ##\r
132     # Creates a Keypair object\r
133     # @param create If create==True, creates a new public/private key and\r
134     #     stores it in the object\r
135     # @param string If string!=None, load the keypair from the string (PEM)\r
136     # @param filename If filename!=None, load the keypair from the file\r
137 \r
138     def __init__(self, create=False, string=None, filename=None):\r
139         if create:\r
140             self.create()\r
141         if string:\r
142             self.load_from_string(string)\r
143         if filename:\r
144             self.load_from_file(filename)\r
145 \r
146     ##\r
147     # Create a RSA public/private key pair and store it inside the keypair object\r
148 \r
149     def create(self):\r
150         self.key = crypto.PKey()\r
151         self.key.generate_key(crypto.TYPE_RSA, 1024)\r
152 \r
153     ##\r
154     # Save the private key to a file\r
155     # @param filename name of file to store the keypair in\r
156 \r
157     def save_to_file(self, filename):\r
158         open(filename, 'w').write(self.as_pem())\r
159         self.filename=filename\r
160 \r
161     ##\r
162     # Load the private key from a file. Implicity the private key includes the public key.\r
163 \r
164     def load_from_file(self, filename):\r
165         self.filename=filename\r
166         buffer = open(filename, 'r').read()\r
167         self.load_from_string(buffer)\r
168 \r
169     ##\r
170     # Load the private key from a string. Implicitly the private key includes the public key.\r
171 \r
172     def load_from_string(self, string):\r
173         if glo_passphrase_callback:\r
174             self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string) )\r
175             self.m2key = M2Crypto.EVP.load_key_string(string, functools.partial(glo_passphrase_callback, self, string) )\r
176         else:\r
177             self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string)\r
178             self.m2key = M2Crypto.EVP.load_key_string(string)\r
179 \r
180     ##\r
181     #  Load the public key from a string. No private key is loaded.\r
182 \r
183     def load_pubkey_from_file(self, filename):\r
184         # load the m2 public key\r
185         m2rsakey = M2Crypto.RSA.load_pub_key(filename)\r
186         self.m2key = M2Crypto.EVP.PKey()\r
187         self.m2key.assign_rsa(m2rsakey)\r
188 \r
189         # create an m2 x509 cert\r
190         m2name = M2Crypto.X509.X509_Name()\r
191         m2name.add_entry_by_txt(field="CN", type=0x1001, entry="junk", len=-1, loc=-1, set=0)\r
192         m2x509 = M2Crypto.X509.X509()\r
193         m2x509.set_pubkey(self.m2key)\r
194         m2x509.set_serial_number(0)\r
195         m2x509.set_issuer_name(m2name)\r
196         m2x509.set_subject_name(m2name)\r
197         ASN1 = M2Crypto.ASN1.ASN1_UTCTIME()\r
198         ASN1.set_time(500)\r
199         m2x509.set_not_before(ASN1)\r
200         m2x509.set_not_after(ASN1)\r
201         # x509v3 so it can have extensions\r
202         # prob not necc since this cert itself is junk but still...\r
203         m2x509.set_version(2)\r
204         junk_key = Keypair(create=True)\r
205         m2x509.sign(pkey=junk_key.get_m2_pkey(), md="sha1")\r
206 \r
207         # convert the m2 x509 cert to a pyopenssl x509\r
208         m2pem = m2x509.as_pem()\r
209         pyx509 = crypto.load_certificate(crypto.FILETYPE_PEM, m2pem)\r
210 \r
211         # get the pyopenssl pkey from the pyopenssl x509\r
212         self.key = pyx509.get_pubkey()\r
213         self.filename=filename\r
214 \r
215     ##\r
216     # Load the public key from a string. No private key is loaded.\r
217 \r
218     def load_pubkey_from_string(self, string):\r
219         (f, fn) = tempfile.mkstemp()\r
220         os.write(f, string)\r
221         os.close(f)\r
222         self.load_pubkey_from_file(fn)\r
223         os.remove(fn)\r
224 \r
225     ##\r
226     # Return the private key in PEM format.\r
227 \r
228     def as_pem(self):\r
229         return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.key)\r
230 \r
231     ##\r
232     # Return an M2Crypto key object\r
233 \r
234     def get_m2_pkey(self):\r
235         if not self.m2key:\r
236             self.m2key = M2Crypto.EVP.load_key_string(self.as_pem())\r
237         return self.m2key\r
238 \r
239     ##\r
240     # Returns a string containing the public key represented by this object.\r
241 \r
242     def get_pubkey_string(self):\r
243         m2pkey = self.get_m2_pkey()\r
244         return base64.b64encode(m2pkey.as_der())\r
245 \r
246     ##\r
247     # Return an OpenSSL pkey object\r
248 \r
249     def get_openssl_pkey(self):\r
250         return self.key\r
251 \r
252     ##\r
253     # Given another Keypair object, return TRUE if the two keys are the same.\r
254 \r
255     def is_same(self, pkey):\r
256         return self.as_pem() == pkey.as_pem()\r
257 \r
258     def sign_string(self, data):\r
259         k = self.get_m2_pkey()\r
260         k.sign_init()\r
261         k.sign_update(data)\r
262         return base64.b64encode(k.sign_final())\r
263 \r
264     def verify_string(self, data, sig):\r
265         k = self.get_m2_pkey()\r
266         k.verify_init()\r
267         k.verify_update(data)\r
268         return M2Crypto.m2.verify_final(k.ctx, base64.b64decode(sig), k.pkey)\r
269 \r
270     def compute_hash(self, value):\r
271         return self.sign_string(str(value))\r
272 \r
273     # only informative\r
274     def get_filename(self):\r
275         return getattr(self,'filename',None)\r
276 \r
277     def dump (self, *args, **kwargs):\r
278         print self.dump_string(*args, **kwargs)\r
279 \r
280     def dump_string (self):\r
281         result=""\r
282         result += "KEYPAIR: pubkey=%40s..."%self.get_pubkey_string()\r
283         filename=self.get_filename()\r
284         if filename: result += "Filename %s\n"%filename\r
285         return result\r
286 \r
287 ##\r
288 # The certificate class implements a general purpose X509 certificate, making\r
289 # use of the appropriate pyOpenSSL or M2Crypto abstractions. It also adds\r
290 # several addition features, such as the ability to maintain a chain of\r
291 # parent certificates, and storage of application-specific data.\r
292 #\r
293 # Certificates include the ability to maintain a chain of parents. Each\r
294 # certificate includes a pointer to it's parent certificate. When loaded\r
295 # from a file or a string, the parent chain will be automatically loaded.\r
296 # When saving a certificate to a file or a string, the caller can choose\r
297 # whether to save the parent certificates as well.\r
298 \r
299 class Certificate:\r
300     digest = "md5"\r
301 \r
302     cert = None\r
303     issuerKey = None\r
304     issuerSubject = None\r
305     parent = None\r
306     isCA = None # will be a boolean once set\r
307 \r
308     separator="-----parent-----"\r
309 \r
310     ##\r
311     # Create a certificate object.\r
312     #\r
313     # @param lifeDays life of cert in days - default is 1825==5 years\r
314     # @param create If create==True, then also create a blank X509 certificate.\r
315     # @param subject If subject!=None, then create a blank certificate and set\r
316     #     it's subject name.\r
317     # @param string If string!=None, load the certficate from the string.\r
318     # @param filename If filename!=None, load the certficiate from the file.\r
319     # @param isCA If !=None, set whether this cert is for a CA\r
320 \r
321     def __init__(self, lifeDays=1825, create=False, subject=None, string=None, filename=None, isCA=None):\r
322         self.data = {}\r
323         if create or subject:\r
324             self.create(lifeDays)\r
325         if subject:\r
326             self.set_subject(subject)\r
327         if string:\r
328             self.load_from_string(string)\r
329         if filename:\r
330             self.load_from_file(filename)\r
331 \r
332         # Set the CA bit if a value was supplied\r
333         if isCA != None:\r
334             self.set_is_ca(isCA)\r
335 \r
336     # Create a blank X509 certificate and store it in this object.\r
337 \r
338     def create(self, lifeDays=1825):\r
339         self.cert = crypto.X509()\r
340         # FIXME: Use different serial #s\r
341         self.cert.set_serial_number(3)\r
342         self.cert.gmtime_adj_notBefore(0) # 0 means now\r
343         self.cert.gmtime_adj_notAfter(lifeDays*60*60*24) # five years is default\r
344         self.cert.set_version(2) # x509v3 so it can have extensions\r
345 \r
346 \r
347     ##\r
348     # Given a pyOpenSSL X509 object, store that object inside of this\r
349     # certificate object.\r
350 \r
351     def load_from_pyopenssl_x509(self, x509):\r
352         self.cert = x509\r
353 \r
354     ##\r
355     # Load the certificate from a string\r
356 \r
357     def load_from_string(self, string):\r
358         # if it is a chain of multiple certs, then split off the first one and\r
359         # load it (support for the ---parent--- tag as well as normal chained certs)\r
360 \r
361         string = string.strip()\r
362         \r
363         # If it's not in proper PEM format, wrap it\r
364         if string.count('-----BEGIN CERTIFICATE') == 0:\r
365             string = '-----BEGIN CERTIFICATE-----\n%s\n-----END CERTIFICATE-----' % string\r
366 \r
367         # If there is a PEM cert in there, but there is some other text first\r
368         # such as the text of the certificate, skip the text\r
369         beg = string.find('-----BEGIN CERTIFICATE')\r
370         if beg > 0:\r
371             # skipping over non cert beginning                                                                                                              \r
372             string = string[beg:]\r
373 \r
374         parts = []\r
375 \r
376         if string.count('-----BEGIN CERTIFICATE-----') > 1 and \\r
377                string.count(Certificate.separator) == 0:\r
378             parts = string.split('-----END CERTIFICATE-----',1)\r
379             parts[0] += '-----END CERTIFICATE-----'\r
380         else:\r
381             parts = string.split(Certificate.separator, 1)\r
382 \r
383         self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, parts[0])\r
384 \r
385         # if there are more certs, then create a parent and let the parent load\r
386         # itself from the remainder of the string\r
387         if len(parts) > 1 and parts[1] != '':\r
388             self.parent = self.__class__()\r
389             self.parent.load_from_string(parts[1])\r
390 \r
391     ##\r
392     # Load the certificate from a file\r
393 \r
394     def load_from_file(self, filename):\r
395         file = open(filename)\r
396         string = file.read()\r
397         self.load_from_string(string)\r
398         self.filename=filename\r
399 \r
400     ##\r
401     # Save the certificate to a string.\r
402     #\r
403     # @param save_parents If save_parents==True, then also save the parent certificates.\r
404 \r
405     def save_to_string(self, save_parents=True):\r
406         string = crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert)\r
407         if save_parents and self.parent:\r
408             string = string + self.parent.save_to_string(save_parents)\r
409         return string\r
410 \r
411     ##\r
412     # Save the certificate to a file.\r
413     # @param save_parents If save_parents==True, then also save the parent certificates.\r
414 \r
415     def save_to_file(self, filename, save_parents=True, filep=None):\r
416         string = self.save_to_string(save_parents=save_parents)\r
417         if filep:\r
418             f = filep\r
419         else:\r
420             f = open(filename, 'w')\r
421         f.write(string)\r
422         f.close()\r
423         self.filename=filename\r
424 \r
425     ##\r
426     # Save the certificate to a random file in /tmp/\r
427     # @param save_parents If save_parents==True, then also save the parent certificates.\r
428     def save_to_random_tmp_file(self, save_parents=True):\r
429         fp, filename = mkstemp(suffix='cert', text=True)\r
430         fp = os.fdopen(fp, "w")\r
431         self.save_to_file(filename, save_parents=True, filep=fp)\r
432         return filename\r
433 \r
434     ##\r
435     # Sets the issuer private key and name\r
436     # @param key Keypair object containing the private key of the issuer\r
437     # @param subject String containing the name of the issuer\r
438     # @param cert (optional) Certificate object containing the name of the issuer\r
439 \r
440     def set_issuer(self, key, subject=None, cert=None):\r
441         self.issuerKey = key\r
442         if subject:\r
443             # it's a mistake to use subject and cert params at the same time\r
444             assert(not cert)\r
445             if isinstance(subject, dict) or isinstance(subject, str):\r
446                 req = crypto.X509Req()\r
447                 reqSubject = req.get_subject()\r
448                 if (isinstance(subject, dict)):\r
449                     for key in reqSubject.keys():\r
450                         setattr(reqSubject, key, subject[key])\r
451                 else:\r
452                     setattr(reqSubject, "CN", subject)\r
453                 subject = reqSubject\r
454                 # subject is not valid once req is out of scope, so save req\r
455                 self.issuerReq = req\r
456         if cert:\r
457             # if a cert was supplied, then get the subject from the cert\r
458             subject = cert.cert.get_subject()\r
459         assert(subject)\r
460         self.issuerSubject = subject\r
461 \r
462     ##\r
463     # Get the issuer name\r
464 \r
465     def get_issuer(self, which="CN"):\r
466         x = self.cert.get_issuer()\r
467         return getattr(x, which)\r
468 \r
469     ##\r
470     # Set the subject name of the certificate\r
471 \r
472     def set_subject(self, name):\r
473         req = crypto.X509Req()\r
474         subj = req.get_subject()\r
475         if (isinstance(name, dict)):\r
476             for key in name.keys():\r
477                 setattr(subj, key, name[key])\r
478         else:\r
479             setattr(subj, "CN", name)\r
480         self.cert.set_subject(subj)\r
481 \r
482     ##\r
483     # Get the subject name of the certificate\r
484 \r
485     def get_subject(self, which="CN"):\r
486         x = self.cert.get_subject()\r
487         return getattr(x, which)\r
488 \r
489     ##\r
490     # Get a pretty-print subject name of the certificate\r
491 \r
492     def get_printable_subject(self):\r
493         x = self.cert.get_subject()\r
494         return "[ OU: %s, CN: %s, SubjectAltName: %s ]" % (getattr(x, "OU"), getattr(x, "CN"), self.get_data())\r
495 \r
496     ##\r
497     # Get the public key of the certificate.\r
498     #\r
499     # @param key Keypair object containing the public key\r
500 \r
501     def set_pubkey(self, key):\r
502         assert(isinstance(key, Keypair))\r
503         self.cert.set_pubkey(key.get_openssl_pkey())\r
504 \r
505     ##\r
506     # Get the public key of the certificate.\r
507     # It is returned in the form of a Keypair object.\r
508 \r
509     def get_pubkey(self):\r
510         m2x509 = X509.load_cert_string(self.save_to_string())\r
511         pkey = Keypair()\r
512         pkey.key = self.cert.get_pubkey()\r
513         pkey.m2key = m2x509.get_pubkey()\r
514         return pkey\r
515 \r
516     def set_intermediate_ca(self, val):\r
517         return self.set_is_ca(val)\r
518 \r
519     # Set whether this cert is for a CA. All signers and only signers should be CAs.\r
520     # The local member starts unset, letting us check that you only set it once\r
521     # @param val Boolean indicating whether this cert is for a CA\r
522     def set_is_ca(self, val):\r
523         if val is None:\r
524             return\r
525 \r
526         if self.isCA != None:\r
527             # Can't double set properties\r
528             raise Exception, "Cannot set basicConstraints CA:?? more than once. Was %s, trying to set as %s" % (self.isCA, val)\r
529 \r
530         self.isCA = val\r
531         if val:\r
532             self.add_extension('basicConstraints', 1, 'CA:TRUE')\r
533         else:\r
534             self.add_extension('basicConstraints', 1, 'CA:FALSE')\r
535 \r
536 \r
537 \r
538     ##\r
539     # Add an X509 extension to the certificate. Add_extension can only be called\r
540     # once for a particular extension name, due to limitations in the underlying\r
541     # library.\r
542     #\r
543     # @param name string containing name of extension\r
544     # @param value string containing value of the extension\r
545 \r
546     def add_extension(self, name, critical, value):\r
547         oldExtVal = None\r
548         try:\r
549             oldExtVal = self.get_extension(name)\r
550         except:\r
551             # M2Crypto LookupError when the extension isn't there (yet)\r
552             pass\r
553 \r
554         # This code limits you from adding the extension with the same value\r
555         # The method comment says you shouldn't do this with the same name\r
556         # But actually it (m2crypto) appears to allow you to do this.\r
557         if oldExtVal and oldExtVal == value:\r
558             # don't add this extension again\r
559             # just do nothing as here\r
560             return\r
561         # FIXME: What if they are trying to set with a different value?\r
562         # Is this ever OK? Or should we raise an exception?\r
563 #        elif oldExtVal:\r
564 #            raise "Cannot add extension %s which had val %s with new val %s" % (name, oldExtVal, value)\r
565 \r
566         ext = crypto.X509Extension (name, critical, value)\r
567         self.cert.add_extensions([ext])\r
568 \r
569     ##\r
570     # Get an X509 extension from the certificate\r
571 \r
572     def get_extension(self, name):\r
573 \r
574         # pyOpenSSL does not have a way to get extensions\r
575         m2x509 = X509.load_cert_string(self.save_to_string())\r
576         value = m2x509.get_ext(name).get_value()\r
577 \r
578         return value\r
579 \r
580     ##\r
581     # Set_data is a wrapper around add_extension. It stores the parameter str in\r
582     # the X509 subject_alt_name extension. Set_data can only be called once, due\r
583     # to limitations in the underlying library.\r
584 \r
585     def set_data(self, str, field='subjectAltName'):\r
586         # pyOpenSSL only allows us to add extensions, so if we try to set the\r
587         # same extension more than once, it will not work\r
588         if self.data.has_key(field):\r
589             raise "Cannot set ", field, " more than once"\r
590         self.data[field] = str\r
591         self.add_extension(field, 0, str)\r
592 \r
593     ##\r
594     # Return the data string that was previously set with set_data\r
595 \r
596     def get_data(self, field='subjectAltName'):\r
597         if self.data.has_key(field):\r
598             return self.data[field]\r
599 \r
600         try:\r
601             uri = self.get_extension(field)\r
602             self.data[field] = uri\r
603         except LookupError:\r
604             return None\r
605 \r
606         return self.data[field]\r
607 \r
608     ##\r
609     # Sign the certificate using the issuer private key and issuer subject previous set with set_issuer().\r
610 \r
611     def sign(self):\r
612         logger.debug('certificate.sign')\r
613         assert self.cert != None\r
614         assert self.issuerSubject != None\r
615         assert self.issuerKey != None\r
616         self.cert.set_issuer(self.issuerSubject)\r
617         self.cert.sign(self.issuerKey.get_openssl_pkey(), self.digest)\r
618 \r
619     ##\r
620     # Verify the authenticity of a certificate.\r
621     # @param pkey is a Keypair object representing a public key. If Pkey\r
622     #     did not sign the certificate, then an exception will be thrown.\r
623 \r
624     def verify(self, pkey):\r
625         # pyOpenSSL does not have a way to verify signatures\r
626         m2x509 = X509.load_cert_string(self.save_to_string())\r
627         m2pkey = pkey.get_m2_pkey()\r
628         # verify it\r
629         return m2x509.verify(m2pkey)\r
630 \r
631         # XXX alternatively, if openssl has been patched, do the much simpler:\r
632         # try:\r
633         #   self.cert.verify(pkey.get_openssl_key())\r
634         #   return 1\r
635         # except:\r
636         #   return 0\r
637 \r
638     ##\r
639     # Return True if pkey is identical to the public key that is contained in the certificate.\r
640     # @param pkey Keypair object\r
641 \r
642     def is_pubkey(self, pkey):\r
643         return self.get_pubkey().is_same(pkey)\r
644 \r
645     ##\r
646     # Given a certificate cert, verify that this certificate was signed by the\r
647     # public key contained in cert. Throw an exception otherwise.\r
648     #\r
649     # @param cert certificate object\r
650 \r
651     def is_signed_by_cert(self, cert):\r
652         k = cert.get_pubkey()\r
653         result = self.verify(k)\r
654         return result\r
655 \r
656     ##\r
657     # Set the parent certficiate.\r
658     #\r
659     # @param p certificate object.\r
660 \r
661     def set_parent(self, p):\r
662         self.parent = p\r
663 \r
664     ##\r
665     # Return the certificate object of the parent of this certificate.\r
666 \r
667     def get_parent(self):\r
668         return self.parent\r
669 \r
670     ##\r
671     # Verification examines a chain of certificates to ensure that each parent\r
672     # signs the child, and that some certificate in the chain is signed by a\r
673     # trusted certificate.\r
674     #\r
675     # Verification is a basic recursion: <pre>\r
676     #     if this_certificate was signed by trusted_certs:\r
677     #         return\r
678     #     else\r
679     #         return verify_chain(parent, trusted_certs)\r
680     # </pre>\r
681     #\r
682     # At each recursion, the parent is tested to ensure that it did sign the\r
683     # child. If a parent did not sign a child, then an exception is thrown. If\r
684     # the bottom of the recursion is reached and the certificate does not match\r
685     # a trusted root, then an exception is thrown.\r
686     # Also require that parents are CAs.\r
687     #\r
688     # @param Trusted_certs is a list of certificates that are trusted.\r
689     #\r
690 \r
691     def verify_chain(self, trusted_certs = None):\r
692         # Verify a chain of certificates. Each certificate must be signed by\r
693         # the public key contained in it's parent. The chain is recursed\r
694         # until a certificate is found that is signed by a trusted root.\r
695 \r
696         # verify expiration time\r
697         if self.cert.has_expired():\r
698             logger.debug("verify_chain: NO, Certificate %s has expired" % self.get_printable_subject())\r
699             raise CertExpired(self.get_printable_subject(), "client cert")\r
700 \r
701         # if this cert is signed by a trusted_cert, then we are set\r
702         for trusted_cert in trusted_certs:\r
703             if self.is_signed_by_cert(trusted_cert):\r
704                 # verify expiration of trusted_cert ?\r
705                 if not trusted_cert.cert.has_expired():\r
706                     logger.debug("verify_chain: YES. Cert %s signed by trusted cert %s"%(\r
707                             self.get_printable_subject(), trusted_cert.get_printable_subject()))\r
708                     return trusted_cert\r
709                 else:\r
710                     logger.debug("verify_chain: NO. Cert %s is signed by trusted_cert %s, but that signer is expired..."%(\r
711                             self.get_printable_subject(),trusted_cert.get_printable_subject()))\r
712                     raise CertExpired(self.get_printable_subject()," signer trusted_cert %s"%trusted_cert.get_printable_subject())\r
713 \r
714         # if there is no parent, then no way to verify the chain\r
715         if not self.parent:\r
716             logger.debug("verify_chain: NO. %s has no parent and issuer %s is not in %d trusted roots"%(self.get_printable_subject(), self.get_issuer(), len(trusted_certs)))\r
717             raise CertMissingParent(self.get_printable_subject() + ": Issuer %s is not one of the %d trusted roots, and cert has no parent." % (self.get_issuer(), len(trusted_certs)))\r
718 \r
719         # if it wasn't signed by the parent...\r
720         if not self.is_signed_by_cert(self.parent):\r
721             logger.debug("verify_chain: NO. %s is not signed by parent %s, but by %s"%\\r
722                              (self.get_printable_subject(), \r
723                               self.parent.get_printable_subject(), \r
724                               self.get_issuer()))\r
725             raise CertNotSignedByParent("%s: Parent %s, issuer %s"\\r
726                                             % (self.get_printable_subject(), \r
727                                                self.parent.get_printable_subject(),\r
728                                                self.get_issuer()))\r
729 \r
730         # Confirm that the parent is a CA. Only CAs can be trusted as\r
731         # signers.\r
732         # Note that trusted roots are not parents, so don't need to be\r
733         # CAs.\r
734         # Ugly - cert objects aren't parsed so we need to read the\r
735         # extension and hope there are no other basicConstraints\r
736         if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):\r
737             logger.warn("verify_chain: cert %s's parent %s is not a CA" % \\r
738                             (self.get_printable_subject(), self.parent.get_printable_subject()))\r
739             raise CertNotSignedByParent("%s: Parent %s not a CA" % (self.get_printable_subject(),\r
740                                                                     self.parent.get_printable_subject()))\r
741 \r
742         # if the parent isn't verified...\r
743         logger.debug("verify_chain: .. %s, -> verifying parent %s"%\\r
744                          (self.get_printable_subject(),self.parent.get_printable_subject()))\r
745         self.parent.verify_chain(trusted_certs)\r
746 \r
747         return\r
748 \r
749     ### more introspection\r
750     def get_extensions(self):\r
751         # pyOpenSSL does not have a way to get extensions\r
752         triples=[]\r
753         m2x509 = X509.load_cert_string(self.save_to_string())\r
754         nb_extensions=m2x509.get_ext_count()\r
755         logger.debug("X509 had %d extensions"%nb_extensions)\r
756         for i in range(nb_extensions):\r
757             ext=m2x509.get_ext_at(i)\r
758             triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) )\r
759         return triples\r
760 \r
761     def get_data_names(self):\r
762         return self.data.keys()\r
763 \r
764     def get_all_datas (self):\r
765         triples=self.get_extensions()\r
766         for name in self.get_data_names():\r
767             triples.append( (name,self.get_data(name),'data',) )\r
768         return triples\r
769 \r
770     # only informative\r
771     def get_filename(self):\r
772         return getattr(self,'filename',None)\r
773 \r
774     def dump (self, *args, **kwargs):\r
775         print self.dump_string(*args, **kwargs)\r
776 \r
777     def dump_string (self,show_extensions=False):\r
778         result = ""\r
779         result += "CERTIFICATE for %s\n"%self.get_printable_subject()\r
780         result += "Issued by %s\n"%self.get_issuer()\r
781         filename=self.get_filename()\r
782         if filename: result += "Filename %s\n"%filename\r
783         if show_extensions:\r
784             all_datas=self.get_all_datas()\r
785             result += " has %d extensions/data attached"%len(all_datas)\r
786             for (n,v,c) in all_datas:\r
787                 if c=='data':\r
788                     result += "   data: %s=%s\n"%(n,v)\r
789                 else:\r
790                     result += "    ext: %s (crit=%s)=<<<%s>>>\n"%(n,c,v)\r
791         return result\r