added lifeDays param to constructor. Certs can only be signed by a CA
[sfa.git] / sfa / trust / certificate.py
1 #----------------------------------------------------------------------\r
2 # Copyright (c) 2008 Board of Trustees, Princeton University\r
3 #\r
4 # Permission is hereby granted, free of charge, to any person obtaining\r
5 # a copy of this software and/or hardware specification (the "Work") to\r
6 # deal in the Work without restriction, including without limitation the\r
7 # rights to use, copy, modify, merge, publish, distribute, sublicense,\r
8 # and/or sell copies of the Work, and to permit persons to whom the Work\r
9 # is furnished to do so, subject to the following conditions:\r
10 #\r
11 # The above copyright notice and this permission notice shall be\r
12 # included in all copies or substantial portions of the Work.\r
13 #\r
14 # THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS \r
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF \r
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND \r
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT \r
18 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, \r
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, \r
20 # OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS \r
21 # IN THE WORK.\r
22 #----------------------------------------------------------------------\r
23 \r
24 ##\r
25 # SFA uses two crypto libraries: pyOpenSSL and M2Crypto to implement\r
26 # the necessary crypto functionality. Ideally just one of these libraries\r
27 # would be used, but unfortunately each of these libraries is independently\r
28 # lacking. The pyOpenSSL library is missing many necessary functions, and\r
29 # the M2Crypto library has crashed inside of some of the functions. The\r
30 # design decision is to use pyOpenSSL whenever possible as it seems more\r
31 # stable, and only use M2Crypto for those functions that are not possible\r
32 # in pyOpenSSL.\r
33 #\r
34 # This module exports two classes: Keypair and Certificate.\r
35 ##\r
36 #\r
37 \r
38 import functools\r
39 import os\r
40 import tempfile\r
41 import base64\r
42 import traceback\r
43 from tempfile import mkstemp\r
44 \r
45 from OpenSSL import crypto\r
46 import M2Crypto\r
47 from M2Crypto import X509\r
48 \r
49 from sfa.util.sfalogging import logger\r
50 from sfa.util.xrn import urn_to_hrn\r
51 from sfa.util.faults import *\r
52 from sfa.util.sfalogging import logger\r
53 \r
54 glo_passphrase_callback = None\r
55 \r
56 ##\r
57 # A global callback msy be implemented for requesting passphrases from the\r
58 # user. The function will be called with three arguments:\r
59 #\r
60 #    keypair_obj: the keypair object that is calling the passphrase\r
61 #    string: the string containing the private key that's being loaded\r
62 #    x: unknown, appears to be 0, comes from pyOpenSSL and/or m2crypto\r
63 #\r
64 # The callback should return a string containing the passphrase.\r
65 \r
66 def set_passphrase_callback(callback_func):\r
67     global glo_passphrase_callback\r
68 \r
69     glo_passphrase_callback = callback_func\r
70 \r
71 ##\r
72 # Sets a fixed passphrase.\r
73 \r
74 def set_passphrase(passphrase):\r
75     set_passphrase_callback( lambda k,s,x: passphrase )\r
76 \r
77 ##\r
78 # Check to see if a passphrase works for a particular private key string.\r
79 # Intended to be used by passphrase callbacks for input validation.\r
80 \r
81 def test_passphrase(string, passphrase):\r
82     try:\r
83         crypto.load_privatekey(crypto.FILETYPE_PEM, string, (lambda x: passphrase))\r
84         return True\r
85     except:\r
86         return False\r
87 \r
88 def convert_public_key(key):\r
89     keyconvert_path = "/usr/bin/keyconvert.py"\r
90     if not os.path.isfile(keyconvert_path):\r
91         raise IOError, "Could not find keyconvert in %s" % keyconvert_path\r
92 \r
93     # we can only convert rsa keys\r
94     if "ssh-dss" in key:\r
95         return None\r
96 \r
97     (ssh_f, ssh_fn) = tempfile.mkstemp()\r
98     ssl_fn = tempfile.mktemp()\r
99     os.write(ssh_f, key)\r
100     os.close(ssh_f)\r
101 \r
102     cmd = keyconvert_path + " " + ssh_fn + " " + ssl_fn\r
103     os.system(cmd)\r
104 \r
105     # this check leaves the temporary file containing the public key so\r
106     # that it can be expected to see why it failed.\r
107     # TODO: for production, cleanup the temporary files\r
108     if not os.path.exists(ssl_fn):\r
109         return None\r
110 \r
111     k = Keypair()\r
112     try:\r
113         k.load_pubkey_from_file(ssl_fn)\r
114     except:\r
115         logger.log_exc("convert_public_key caught exception")\r
116         k = None\r
117 \r
118     # remove the temporary files\r
119     os.remove(ssh_fn)\r
120     os.remove(ssl_fn)\r
121 \r
122     return k\r
123 \r
124 ##\r
125 # Public-private key pairs are implemented by the Keypair class.\r
126 # A Keypair object may represent both a public and private key pair, or it\r
127 # may represent only a public key (this usage is consistent with OpenSSL).\r
128 \r
129 class Keypair:\r
130     key = None       # public/private keypair\r
131     m2key = None     # public key (m2crypto format)\r
132 \r
133     ##\r
134     # Creates a Keypair object\r
135     # @param create If create==True, creates a new public/private key and\r
136     #     stores it in the object\r
137     # @param string If string!=None, load the keypair from the string (PEM)\r
138     # @param filename If filename!=None, load the keypair from the file\r
139 \r
140     def __init__(self, create=False, string=None, filename=None):\r
141         if create:\r
142             self.create()\r
143         if string:\r
144             self.load_from_string(string)\r
145         if filename:\r
146             self.load_from_file(filename)\r
147 \r
148     ##\r
149     # Create a RSA public/private key pair and store it inside the keypair object\r
150 \r
151     def create(self):\r
152         self.key = crypto.PKey()\r
153         self.key.generate_key(crypto.TYPE_RSA, 1024)\r
154 \r
155     ##\r
156     # Save the private key to a file\r
157     # @param filename name of file to store the keypair in\r
158 \r
159     def save_to_file(self, filename):\r
160         open(filename, 'w').write(self.as_pem())\r
161         self.filename=filename\r
162 \r
163     ##\r
164     # Load the private key from a file. Implicity the private key includes the public key.\r
165 \r
166     def load_from_file(self, filename):\r
167         self.filename=filename\r
168         buffer = open(filename, 'r').read()\r
169         self.load_from_string(buffer)\r
170 \r
171     ##\r
172     # Load the private key from a string. Implicitly the private key includes the public key.\r
173 \r
174     def load_from_string(self, string):\r
175         if glo_passphrase_callback:\r
176             self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string) )\r
177             self.m2key = M2Crypto.EVP.load_key_string(string, functools.partial(glo_passphrase_callback, self, string) )\r
178         else:\r
179             self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string)\r
180             self.m2key = M2Crypto.EVP.load_key_string(string)\r
181 \r
182     ##\r
183     #  Load the public key from a string. No private key is loaded.\r
184 \r
185     def load_pubkey_from_file(self, filename):\r
186         # load the m2 public key\r
187         m2rsakey = M2Crypto.RSA.load_pub_key(filename)\r
188         self.m2key = M2Crypto.EVP.PKey()\r
189         self.m2key.assign_rsa(m2rsakey)\r
190 \r
191         # create an m2 x509 cert\r
192         m2name = M2Crypto.X509.X509_Name()\r
193         m2name.add_entry_by_txt(field="CN", type=0x1001, entry="junk", len=-1, loc=-1, set=0)\r
194         m2x509 = M2Crypto.X509.X509()\r
195         m2x509.set_pubkey(self.m2key)\r
196         m2x509.set_serial_number(0)\r
197         m2x509.set_issuer_name(m2name)\r
198         m2x509.set_subject_name(m2name)\r
199         ASN1 = M2Crypto.ASN1.ASN1_UTCTIME()\r
200         ASN1.set_time(500)\r
201         m2x509.set_not_before(ASN1)\r
202         m2x509.set_not_after(ASN1)\r
203         # x509v3 so it can have extensions\r
204         # prob not necc since this cert itself is junk but still...\r
205         m2x509.set_version(2)\r
206         junk_key = Keypair(create=True)\r
207         m2x509.sign(pkey=junk_key.get_m2_pkey(), md="sha1")\r
208 \r
209         # convert the m2 x509 cert to a pyopenssl x509\r
210         m2pem = m2x509.as_pem()\r
211         pyx509 = crypto.load_certificate(crypto.FILETYPE_PEM, m2pem)\r
212 \r
213         # get the pyopenssl pkey from the pyopenssl x509\r
214         self.key = pyx509.get_pubkey()\r
215         self.filename=filename\r
216 \r
217     ##\r
218     # Load the public key from a string. No private key is loaded.\r
219 \r
220     def load_pubkey_from_string(self, string):\r
221         (f, fn) = tempfile.mkstemp()\r
222         os.write(f, string)\r
223         os.close(f)\r
224         self.load_pubkey_from_file(fn)\r
225         os.remove(fn)\r
226 \r
227     ##\r
228     # Return the private key in PEM format.\r
229 \r
230     def as_pem(self):\r
231         return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.key)\r
232 \r
233     ##\r
234     # Return an M2Crypto key object\r
235 \r
236     def get_m2_pkey(self):\r
237         if not self.m2key:\r
238             self.m2key = M2Crypto.EVP.load_key_string(self.as_pem())\r
239         return self.m2key\r
240 \r
241     ##\r
242     # Returns a string containing the public key represented by this object.\r
243 \r
244     def get_pubkey_string(self):\r
245         m2pkey = self.get_m2_pkey()\r
246         return base64.b64encode(m2pkey.as_der())\r
247 \r
248     ##\r
249     # Return an OpenSSL pkey object\r
250 \r
251     def get_openssl_pkey(self):\r
252         return self.key\r
253 \r
254     ##\r
255     # Given another Keypair object, return TRUE if the two keys are the same.\r
256 \r
257     def is_same(self, pkey):\r
258         return self.as_pem() == pkey.as_pem()\r
259 \r
260     def sign_string(self, data):\r
261         k = self.get_m2_pkey()\r
262         k.sign_init()\r
263         k.sign_update(data)\r
264         return base64.b64encode(k.sign_final())\r
265 \r
266     def verify_string(self, data, sig):\r
267         k = self.get_m2_pkey()\r
268         k.verify_init()\r
269         k.verify_update(data)\r
270         return M2Crypto.m2.verify_final(k.ctx, base64.b64decode(sig), k.pkey)\r
271 \r
272     def compute_hash(self, value):\r
273         return self.sign_string(str(value))\r
274 \r
275     # only informative\r
276     def get_filename(self):\r
277         return getattr(self,'filename',None)\r
278 \r
279     def dump (self, *args, **kwargs):\r
280         print self.dump_string(*args, **kwargs)\r
281 \r
282     def dump_string (self):\r
283         result=""\r
284         result += "KEYPAIR: pubkey=%40s..."%self.get_pubkey_string()\r
285         filename=self.get_filename()\r
286         if filename: result += "Filename %s\n"%filename\r
287         return result\r
288 \r
289 ##\r
290 # The certificate class implements a general purpose X509 certificate, making\r
291 # use of the appropriate pyOpenSSL or M2Crypto abstractions. It also adds\r
292 # several addition features, such as the ability to maintain a chain of\r
293 # parent certificates, and storage of application-specific data.\r
294 #\r
295 # Certificates include the ability to maintain a chain of parents. Each\r
296 # certificate includes a pointer to it's parent certificate. When loaded\r
297 # from a file or a string, the parent chain will be automatically loaded.\r
298 # When saving a certificate to a file or a string, the caller can choose\r
299 # whether to save the parent certificates as well.\r
300 \r
301 class Certificate:\r
302     digest = "md5"\r
303 \r
304     cert = None\r
305     issuerKey = None\r
306     issuerSubject = None\r
307     parent = None\r
308     isCA = None # will be a boolean once set\r
309 \r
310     separator="-----parent-----"\r
311 \r
312     ##\r
313     # Create a certificate object.\r
314     #\r
315     # @param lifeDays life of cert in days - default is 1825==5 years\r
316     # @param create If create==True, then also create a blank X509 certificate.\r
317     # @param subject If subject!=None, then create a blank certificate and set\r
318     #     it's subject name.\r
319     # @param string If string!=None, load the certficate from the string.\r
320     # @param filename If filename!=None, load the certficiate from the file.\r
321     # @param isCA If !=None, set whether this cert is for a CA\r
322 \r
323     def __init__(self, lifeDays=1825, create=False, subject=None, string=None, filename=None, isCA=None):\r
324         self.data = {}\r
325         if create or subject:\r
326             self.create(lifeDays)\r
327         if subject:\r
328             self.set_subject(subject)\r
329         if string:\r
330             self.load_from_string(string)\r
331         if filename:\r
332             self.load_from_file(filename)\r
333 \r
334         # Set the CA bit if a value was supplied\r
335         if isCA != None:\r
336             self.set_is_ca(isCA)\r
337 \r
338     # Create a blank X509 certificate and store it in this object.\r
339 \r
340     def create(self, lifeDays=1825):\r
341         self.cert = crypto.X509()\r
342         # FIXME: Use different serial #s\r
343         self.cert.set_serial_number(3)\r
344         self.cert.gmtime_adj_notBefore(0) # 0 means now\r
345         self.cert.gmtime_adj_notAfter(lifeDays*60*60*24) # five years is default\r
346         self.cert.set_version(2) # x509v3 so it can have extensions\r
347 \r
348 \r
349     ##\r
350     # Given a pyOpenSSL X509 object, store that object inside of this\r
351     # certificate object.\r
352 \r
353     def load_from_pyopenssl_x509(self, x509):\r
354         self.cert = x509\r
355 \r
356     ##\r
357     # Load the certificate from a string\r
358 \r
359     def load_from_string(self, string):\r
360         # if it is a chain of multiple certs, then split off the first one and\r
361         # load it (support for the ---parent--- tag as well as normal chained certs)\r
362 \r
363         string = string.strip()\r
364         \r
365         # If it's not in proper PEM format, wrap it\r
366         if string.count('-----BEGIN CERTIFICATE') == 0:\r
367             string = '-----BEGIN CERTIFICATE-----\n%s\n-----END CERTIFICATE-----' % string\r
368 \r
369         # If there is a PEM cert in there, but there is some other text first\r
370         # such as the text of the certificate, skip the text\r
371         beg = string.find('-----BEGIN CERTIFICATE')\r
372         if beg > 0:\r
373             # skipping over non cert beginning                                                                                                              \r
374             string = string[beg:]\r
375 \r
376         parts = []\r
377 \r
378         if string.count('-----BEGIN CERTIFICATE-----') > 1 and \\r
379                string.count(Certificate.separator) == 0:\r
380             parts = string.split('-----END CERTIFICATE-----',1)\r
381             parts[0] += '-----END CERTIFICATE-----'\r
382         else:\r
383             parts = string.split(Certificate.separator, 1)\r
384 \r
385         self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, parts[0])\r
386 \r
387         # if there are more certs, then create a parent and let the parent load\r
388         # itself from the remainder of the string\r
389         if len(parts) > 1 and parts[1] != '':\r
390             self.parent = self.__class__()\r
391             self.parent.load_from_string(parts[1])\r
392 \r
393     ##\r
394     # Load the certificate from a file\r
395 \r
396     def load_from_file(self, filename):\r
397         file = open(filename)\r
398         string = file.read()\r
399         self.load_from_string(string)\r
400         self.filename=filename\r
401 \r
402     ##\r
403     # Save the certificate to a string.\r
404     #\r
405     # @param save_parents If save_parents==True, then also save the parent certificates.\r
406 \r
407     def save_to_string(self, save_parents=True):\r
408         string = crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert)\r
409         if save_parents and self.parent:\r
410             string = string + self.parent.save_to_string(save_parents)\r
411         return string\r
412 \r
413     ##\r
414     # Save the certificate to a file.\r
415     # @param save_parents If save_parents==True, then also save the parent certificates.\r
416 \r
417     def save_to_file(self, filename, save_parents=True, filep=None):\r
418         string = self.save_to_string(save_parents=save_parents)\r
419         if filep:\r
420             f = filep\r
421         else:\r
422             f = open(filename, 'w')\r
423         f.write(string)\r
424         f.close()\r
425         self.filename=filename\r
426 \r
427     ##\r
428     # Save the certificate to a random file in /tmp/\r
429     # @param save_parents If save_parents==True, then also save the parent certificates.\r
430     def save_to_random_tmp_file(self, save_parents=True):\r
431         fp, filename = mkstemp(suffix='cert', text=True)\r
432         fp = os.fdopen(fp, "w")\r
433         self.save_to_file(filename, save_parents=True, filep=fp)\r
434         return filename\r
435 \r
436     ##\r
437     # Sets the issuer private key and name\r
438     # @param key Keypair object containing the private key of the issuer\r
439     # @param subject String containing the name of the issuer\r
440     # @param cert (optional) Certificate object containing the name of the issuer\r
441 \r
442     def set_issuer(self, key, subject=None, cert=None):\r
443         self.issuerKey = key\r
444         if subject:\r
445             # it's a mistake to use subject and cert params at the same time\r
446             assert(not cert)\r
447             if isinstance(subject, dict) or isinstance(subject, str):\r
448                 req = crypto.X509Req()\r
449                 reqSubject = req.get_subject()\r
450                 if (isinstance(subject, dict)):\r
451                     for key in reqSubject.keys():\r
452                         setattr(reqSubject, key, subject[key])\r
453                 else:\r
454                     setattr(reqSubject, "CN", subject)\r
455                 subject = reqSubject\r
456                 # subject is not valid once req is out of scope, so save req\r
457                 self.issuerReq = req\r
458         if cert:\r
459             # if a cert was supplied, then get the subject from the cert\r
460             subject = cert.cert.get_subject()\r
461         assert(subject)\r
462         self.issuerSubject = subject\r
463 \r
464     ##\r
465     # Get the issuer name\r
466 \r
467     def get_issuer(self, which="CN"):\r
468         x = self.cert.get_issuer()\r
469         return getattr(x, which)\r
470 \r
471     ##\r
472     # Set the subject name of the certificate\r
473 \r
474     def set_subject(self, name):\r
475         req = crypto.X509Req()\r
476         subj = req.get_subject()\r
477         if (isinstance(name, dict)):\r
478             for key in name.keys():\r
479                 setattr(subj, key, name[key])\r
480         else:\r
481             setattr(subj, "CN", name)\r
482         self.cert.set_subject(subj)\r
483     ##\r
484     # Get the subject name of the certificate\r
485 \r
486     def get_subject(self, which="CN"):\r
487         x = self.cert.get_subject()\r
488         return getattr(x, which)\r
489 \r
490     ##\r
491     # Get the public key of the certificate.\r
492     #\r
493     # @param key Keypair object containing the public key\r
494 \r
495     def set_pubkey(self, key):\r
496         assert(isinstance(key, Keypair))\r
497         self.cert.set_pubkey(key.get_openssl_pkey())\r
498 \r
499     ##\r
500     # Get the public key of the certificate.\r
501     # It is returned in the form of a Keypair object.\r
502 \r
503     def get_pubkey(self):\r
504         m2x509 = X509.load_cert_string(self.save_to_string())\r
505         pkey = Keypair()\r
506         pkey.key = self.cert.get_pubkey()\r
507         pkey.m2key = m2x509.get_pubkey()\r
508         return pkey\r
509 \r
510     def set_intermediate_ca(self, val):\r
511         return self.set_is_ca(val)\r
512 \r
513     # Set whether this cert is for a CA. All signers and only signers should be CAs.\r
514     # The local member starts unset, letting us check that you only set it once\r
515     # @param val Boolean indicating whether this cert is for a CA\r
516     def set_is_ca(self, val):\r
517         if val is None:\r
518             return\r
519 \r
520         if self.isCA != None:\r
521             # Can't double set properties\r
522             raise "Cannot set basicConstraints CA:?? more than once. Was %s, trying to set as %s" % (self.isCA, val)\r
523 \r
524         self.isCA = val\r
525         if val:\r
526             self.add_extension('basicConstraints', 1, 'CA:TRUE')\r
527         else:\r
528             self.add_extension('basicConstraints', 1, 'CA:FALSE')\r
529 \r
530 \r
531 \r
532     ##\r
533     # Add an X509 extension to the certificate. Add_extension can only be called\r
534     # once for a particular extension name, due to limitations in the underlying\r
535     # library.\r
536     #\r
537     # @param name string containing name of extension\r
538     # @param value string containing value of the extension\r
539 \r
540     def add_extension(self, name, critical, value):\r
541         oldExtVal = None\r
542         try:\r
543             oldExtVal = self.get_extension(name)\r
544         except:\r
545             # M2Crypto LookupError when the extension isn't there (yet)\r
546             pass\r
547 \r
548         # This code limits you from adding the extension with the same value\r
549         # The method comment says you shouldn't do this with the same name\r
550         # But actually it (m2crypto) appears to allow you to do this.\r
551         if oldExtVal and oldExtVal == value:\r
552             # don't add this extension again\r
553             # just do nothing as here\r
554             return\r
555         # FIXME: What if they are trying to set with a different value?\r
556         # Is this ever OK? Or should we raise an exception?\r
557 #        elif oldExtVal:\r
558 #            raise "Cannot add extension %s which had val %s with new val %s" % (name, oldExtVal, value)\r
559 \r
560         ext = crypto.X509Extension (name, critical, value)\r
561         self.cert.add_extensions([ext])\r
562 \r
563     ##\r
564     # Get an X509 extension from the certificate\r
565 \r
566     def get_extension(self, name):\r
567 \r
568         # pyOpenSSL does not have a way to get extensions\r
569         m2x509 = X509.load_cert_string(self.save_to_string())\r
570         value = m2x509.get_ext(name).get_value()\r
571 \r
572         return value\r
573 \r
574     ##\r
575     # Set_data is a wrapper around add_extension. It stores the parameter str in\r
576     # the X509 subject_alt_name extension. Set_data can only be called once, due\r
577     # to limitations in the underlying library.\r
578 \r
579     def set_data(self, str, field='subjectAltName'):\r
580         # pyOpenSSL only allows us to add extensions, so if we try to set the\r
581         # same extension more than once, it will not work\r
582         if self.data.has_key(field):\r
583             raise "Cannot set ", field, " more than once"\r
584         self.data[field] = str\r
585         self.add_extension(field, 0, str)\r
586 \r
587     ##\r
588     # Return the data string that was previously set with set_data\r
589 \r
590     def get_data(self, field='subjectAltName'):\r
591         if self.data.has_key(field):\r
592             return self.data[field]\r
593 \r
594         try:\r
595             uri = self.get_extension(field)\r
596             self.data[field] = uri\r
597         except LookupError:\r
598             return None\r
599 \r
600         return self.data[field]\r
601 \r
602     ##\r
603     # Sign the certificate using the issuer private key and issuer subject previous set with set_issuer().\r
604 \r
605     def sign(self):\r
606         logger.debug('certificate.sign')\r
607         assert self.cert != None\r
608         assert self.issuerSubject != None\r
609         assert self.issuerKey != None\r
610         self.cert.set_issuer(self.issuerSubject)\r
611         self.cert.sign(self.issuerKey.get_openssl_pkey(), self.digest)\r
612 \r
613     ##\r
614     # Verify the authenticity of a certificate.\r
615     # @param pkey is a Keypair object representing a public key. If Pkey\r
616     #     did not sign the certificate, then an exception will be thrown.\r
617 \r
618     def verify(self, pkey):\r
619         # pyOpenSSL does not have a way to verify signatures\r
620         m2x509 = X509.load_cert_string(self.save_to_string())\r
621         m2pkey = pkey.get_m2_pkey()\r
622         # verify it\r
623         return m2x509.verify(m2pkey)\r
624 \r
625         # XXX alternatively, if openssl has been patched, do the much simpler:\r
626         # try:\r
627         #   self.cert.verify(pkey.get_openssl_key())\r
628         #   return 1\r
629         # except:\r
630         #   return 0\r
631 \r
632     ##\r
633     # Return True if pkey is identical to the public key that is contained in the certificate.\r
634     # @param pkey Keypair object\r
635 \r
636     def is_pubkey(self, pkey):\r
637         return self.get_pubkey().is_same(pkey)\r
638 \r
639     ##\r
640     # Given a certificate cert, verify that this certificate was signed by the\r
641     # public key contained in cert. Throw an exception otherwise.\r
642     #\r
643     # @param cert certificate object\r
644 \r
645     def is_signed_by_cert(self, cert):\r
646         k = cert.get_pubkey()\r
647         result = self.verify(k)\r
648         return result\r
649 \r
650     ##\r
651     # Set the parent certficiate.\r
652     #\r
653     # @param p certificate object.\r
654 \r
655     def set_parent(self, p):\r
656         self.parent = p\r
657 \r
658     ##\r
659     # Return the certificate object of the parent of this certificate.\r
660 \r
661     def get_parent(self):\r
662         return self.parent\r
663 \r
664     ##\r
665     # Verification examines a chain of certificates to ensure that each parent\r
666     # signs the child, and that some certificate in the chain is signed by a\r
667     # trusted certificate.\r
668     #\r
669     # Verification is a basic recursion: <pre>\r
670     #     if this_certificate was signed by trusted_certs:\r
671     #         return\r
672     #     else\r
673     #         return verify_chain(parent, trusted_certs)\r
674     # </pre>\r
675     #\r
676     # At each recursion, the parent is tested to ensure that it did sign the\r
677     # child. If a parent did not sign a child, then an exception is thrown. If\r
678     # the bottom of the recursion is reached and the certificate does not match\r
679     # a trusted root, then an exception is thrown.\r
680     # Also require that parents are CAs.\r
681     #\r
682     # @param Trusted_certs is a list of certificates that are trusted.\r
683     #\r
684 \r
685     def verify_chain(self, trusted_certs = None):\r
686         # Verify a chain of certificates. Each certificate must be signed by\r
687         # the public key contained in it's parent. The chain is recursed\r
688         # until a certificate is found that is signed by a trusted root.\r
689 \r
690         # verify expiration time\r
691         if self.cert.has_expired():\r
692             logger.debug("verify_chain: NO our certificate has expired")\r
693             raise CertExpired(self.get_subject(), "client cert")   \r
694         \r
695         # if this cert is signed by a trusted_cert, then we are set\r
696         for trusted_cert in trusted_certs:\r
697             if self.is_signed_by_cert(trusted_cert):\r
698                 # verify expiration of trusted_cert ?\r
699                 if not trusted_cert.cert.has_expired():\r
700                     logger.debug("verify_chain: YES cert %s signed by trusted cert %s"%(\r
701                             self.get_subject(), trusted_cert.get_subject()))\r
702                     return trusted_cert\r
703                 else:\r
704                     logger.debug("verify_chain: NO cert %s is signed by trusted_cert %s, but this is expired..."%(\r
705                             self.get_subject(),trusted_cert.get_subject()))\r
706                     raise CertExpired(self.get_subject(),"trusted_cert %s"%trusted_cert.get_subject())\r
707 \r
708         # if there is no parent, then no way to verify the chain\r
709         if not self.parent:\r
710             logger.debug("verify_chain: NO %s has no parent and is not in trusted roots"%self.get_subject())\r
711             raise CertMissingParent(self.get_subject())\r
712 \r
713         # if it wasn't signed by the parent...\r
714         if not self.is_signed_by_cert(self.parent):\r
715             logger.debug("verify_chain: NO %s is not signed by parent"%self.get_subject())\r
716             return CertNotSignedByParent(self.get_subject())\r
717 \r
718         # Confirm that the parent is a CA. Only CAs can be trusted as\r
719         # signers.\r
720         # Note that trusted roots are not parents, so don't need to be\r
721         # CAs.\r
722         # Ugly - cert objects aren't parsed so we need to read the\r
723         # extension and hope there are no other basicConstraints\r
724         if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):\r
725             logger.warn("verify_chain: cert %s's parent %s is not a CA" % (self.get_subject(), self.parent.get_subject()))\r
726             return CertNotSignedByParent(self.get_subject())\r
727 \r
728         # if the parent isn't verified...\r
729         logger.debug("verify_chain: .. %s, -> verifying parent %s"%(self.get_subject(),self.parent.get_subject()))\r
730         self.parent.verify_chain(trusted_certs)\r
731 \r
732         return\r
733 \r
734     ### more introspection\r
735     def get_extensions(self):\r
736         # pyOpenSSL does not have a way to get extensions\r
737         triples=[]\r
738         m2x509 = X509.load_cert_string(self.save_to_string())\r
739         nb_extensions=m2x509.get_ext_count()\r
740         logger.debug("X509 had %d extensions"%nb_extensions)\r
741         for i in range(nb_extensions):\r
742             ext=m2x509.get_ext_at(i)\r
743             triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) )\r
744         return triples\r
745 \r
746     def get_data_names(self):\r
747         return self.data.keys()\r
748 \r
749     def get_all_datas (self):\r
750         triples=self.get_extensions()\r
751         for name in self.get_data_names():\r
752             triples.append( (name,self.get_data(name),'data',) )\r
753         return triples\r
754 \r
755     # only informative\r
756     def get_filename(self):\r
757         return getattr(self,'filename',None)\r
758 \r
759     def dump (self, *args, **kwargs):\r
760         print self.dump_string(*args, **kwargs)\r
761 \r
762     def dump_string (self,show_extensions=False):\r
763         result = ""\r
764         result += "CERTIFICATE for %s\n"%self.get_subject()\r
765         result += "Issued by %s\n"%self.get_issuer()\r
766         filename=self.get_filename()\r
767         if filename: result += "Filename %s\n"%filename\r
768         if show_extensions:\r
769             all_datas=self.get_all_datas()\r
770             result += " has %d extensions/data attached"%len(all_datas)\r
771             for (n,v,c) in all_datas:\r
772                 if c=='data':\r
773                     result += "   data: %s=%s\n"%(n,v)\r
774                 else:\r
775                     result += "    ext: %s (crit=%s)=<<<%s>>>\n"%(n,c,v)\r
776         return result\r