bcec9d61e4d7b42c8c106cb412b254c3537a685b
[sfa.git] / sfa / trust / certificate.py
1 #----------------------------------------------------------------------\r
2 # Copyright (c) 2008 Board of Trustees, Princeton University\r
3 #\r
4 # Permission is hereby granted, free of charge, to any person obtaining\r
5 # a copy of this software and/or hardware specification (the "Work") to\r
6 # deal in the Work without restriction, including without limitation the\r
7 # rights to use, copy, modify, merge, publish, distribute, sublicense,\r
8 # and/or sell copies of the Work, and to permit persons to whom the Work\r
9 # is furnished to do so, subject to the following conditions:\r
10 #\r
11 # The above copyright notice and this permission notice shall be\r
12 # included in all copies or substantial portions of the Work.\r
13 #\r
14 # THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS \r
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF \r
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND \r
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT \r
18 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, \r
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, \r
20 # OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS \r
21 # IN THE WORK.\r
22 #----------------------------------------------------------------------\r
23 \r
24 ##\r
25 # SFA uses two crypto libraries: pyOpenSSL and M2Crypto to implement\r
26 # the necessary crypto functionality. Ideally just one of these libraries\r
27 # would be used, but unfortunately each of these libraries is independently\r
28 # lacking. The pyOpenSSL library is missing many necessary functions, and\r
29 # the M2Crypto library has crashed inside of some of the functions. The\r
30 # design decision is to use pyOpenSSL whenever possible as it seems more\r
31 # stable, and only use M2Crypto for those functions that are not possible\r
32 # in pyOpenSSL.\r
33 #\r
34 # This module exports two classes: Keypair and Certificate.\r
35 ##\r
36 #\r
37 \r
38 import functools\r
39 import os\r
40 import tempfile\r
41 import base64\r
42 import traceback\r
43 from tempfile import mkstemp\r
44 \r
45 from OpenSSL import crypto\r
46 import M2Crypto\r
47 from M2Crypto import X509\r
48 \r
49 from sfa.util.sfalogging import logger\r
50 from sfa.util.xrn import urn_to_hrn\r
51 from sfa.util.faults import *\r
52 from sfa.util.sfalogging import logger\r
53 \r
54 glo_passphrase_callback = None\r
55 \r
56 ##\r
57 # A global callback msy be implemented for requesting passphrases from the\r
58 # user. The function will be called with three arguments:\r
59 #\r
60 #    keypair_obj: the keypair object that is calling the passphrase\r
61 #    string: the string containing the private key that's being loaded\r
62 #    x: unknown, appears to be 0, comes from pyOpenSSL and/or m2crypto\r
63 #\r
64 # The callback should return a string containing the passphrase.\r
65 \r
66 def set_passphrase_callback(callback_func):\r
67     global glo_passphrase_callback\r
68 \r
69     glo_passphrase_callback = callback_func\r
70 \r
71 ##\r
72 # Sets a fixed passphrase.\r
73 \r
74 def set_passphrase(passphrase):\r
75     set_passphrase_callback( lambda k,s,x: passphrase )\r
76 \r
77 ##\r
78 # Check to see if a passphrase works for a particular private key string.\r
79 # Intended to be used by passphrase callbacks for input validation.\r
80 \r
81 def test_passphrase(string, passphrase):\r
82     try:\r
83         crypto.load_privatekey(crypto.FILETYPE_PEM, string, (lambda x: passphrase))\r
84         return True\r
85     except:\r
86         return False\r
87 \r
88 def convert_public_key(key):\r
89     keyconvert_path = "/usr/bin/keyconvert.py"\r
90     if not os.path.isfile(keyconvert_path):\r
91         raise IOError, "Could not find keyconvert in %s" % keyconvert_path\r
92 \r
93     # we can only convert rsa keys\r
94     if "ssh-dss" in key:\r
95         return None\r
96 \r
97     (ssh_f, ssh_fn) = tempfile.mkstemp()\r
98     ssl_fn = tempfile.mktemp()\r
99     os.write(ssh_f, key)\r
100     os.close(ssh_f)\r
101 \r
102     cmd = keyconvert_path + " " + ssh_fn + " " + ssl_fn\r
103     os.system(cmd)\r
104 \r
105     # this check leaves the temporary file containing the public key so\r
106     # that it can be expected to see why it failed.\r
107     # TODO: for production, cleanup the temporary files\r
108     if not os.path.exists(ssl_fn):\r
109         return None\r
110 \r
111     k = Keypair()\r
112     try:\r
113         k.load_pubkey_from_file(ssl_fn)\r
114     except:\r
115         logger.log_exc("convert_public_key caught exception")\r
116         k = None\r
117 \r
118     # remove the temporary files\r
119     os.remove(ssh_fn)\r
120     os.remove(ssl_fn)\r
121 \r
122     return k\r
123 \r
124 ##\r
125 # Public-private key pairs are implemented by the Keypair class.\r
126 # A Keypair object may represent both a public and private key pair, or it\r
127 # may represent only a public key (this usage is consistent with OpenSSL).\r
128 \r
129 class Keypair:\r
130     key = None       # public/private keypair\r
131     m2key = None     # public key (m2crypto format)\r
132 \r
133     ##\r
134     # Creates a Keypair object\r
135     # @param create If create==True, creates a new public/private key and\r
136     #     stores it in the object\r
137     # @param string If string!=None, load the keypair from the string (PEM)\r
138     # @param filename If filename!=None, load the keypair from the file\r
139 \r
140     def __init__(self, create=False, string=None, filename=None):\r
141         if create:\r
142             self.create()\r
143         if string:\r
144             self.load_from_string(string)\r
145         if filename:\r
146             self.load_from_file(filename)\r
147 \r
148     ##\r
149     # Create a RSA public/private key pair and store it inside the keypair object\r
150 \r
151     def create(self):\r
152         self.key = crypto.PKey()\r
153         self.key.generate_key(crypto.TYPE_RSA, 1024)\r
154 \r
155     ##\r
156     # Save the private key to a file\r
157     # @param filename name of file to store the keypair in\r
158 \r
159     def save_to_file(self, filename):\r
160         open(filename, 'w').write(self.as_pem())\r
161         self.filename=filename\r
162 \r
163     ##\r
164     # Load the private key from a file. Implicity the private key includes the public key.\r
165 \r
166     def load_from_file(self, filename):\r
167         self.filename=filename\r
168         buffer = open(filename, 'r').read()\r
169         self.load_from_string(buffer)\r
170 \r
171     ##\r
172     # Load the private key from a string. Implicitly the private key includes the public key.\r
173 \r
174     def load_from_string(self, string):\r
175         if glo_passphrase_callback:\r
176             self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string) )\r
177             self.m2key = M2Crypto.EVP.load_key_string(string, functools.partial(glo_passphrase_callback, self, string) )\r
178         else:\r
179             self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string)\r
180             self.m2key = M2Crypto.EVP.load_key_string(string)\r
181 \r
182     ##\r
183     #  Load the public key from a string. No private key is loaded.\r
184 \r
185     def load_pubkey_from_file(self, filename):\r
186         # load the m2 public key\r
187         m2rsakey = M2Crypto.RSA.load_pub_key(filename)\r
188         self.m2key = M2Crypto.EVP.PKey()\r
189         self.m2key.assign_rsa(m2rsakey)\r
190 \r
191         # create an m2 x509 cert\r
192         m2name = M2Crypto.X509.X509_Name()\r
193         m2name.add_entry_by_txt(field="CN", type=0x1001, entry="junk", len=-1, loc=-1, set=0)\r
194         m2x509 = M2Crypto.X509.X509()\r
195         m2x509.set_pubkey(self.m2key)\r
196         m2x509.set_serial_number(0)\r
197         m2x509.set_issuer_name(m2name)\r
198         m2x509.set_subject_name(m2name)\r
199         ASN1 = M2Crypto.ASN1.ASN1_UTCTIME()\r
200         ASN1.set_time(500)\r
201         m2x509.set_not_before(ASN1)\r
202         m2x509.set_not_after(ASN1)\r
203         # x509v3 so it can have extensions\r
204         # prob not necc since this cert itself is junk but still...\r
205         m2x509.set_version(2)\r
206         junk_key = Keypair(create=True)\r
207         m2x509.sign(pkey=junk_key.get_m2_pkey(), md="sha1")\r
208 \r
209         # convert the m2 x509 cert to a pyopenssl x509\r
210         m2pem = m2x509.as_pem()\r
211         pyx509 = crypto.load_certificate(crypto.FILETYPE_PEM, m2pem)\r
212 \r
213         # get the pyopenssl pkey from the pyopenssl x509\r
214         self.key = pyx509.get_pubkey()\r
215         self.filename=filename\r
216 \r
217     ##\r
218     # Load the public key from a string. No private key is loaded.\r
219 \r
220     def load_pubkey_from_string(self, string):\r
221         (f, fn) = tempfile.mkstemp()\r
222         os.write(f, string)\r
223         os.close(f)\r
224         self.load_pubkey_from_file(fn)\r
225         os.remove(fn)\r
226 \r
227     ##\r
228     # Return the private key in PEM format.\r
229 \r
230     def as_pem(self):\r
231         return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.key)\r
232 \r
233     ##\r
234     # Return an M2Crypto key object\r
235 \r
236     def get_m2_pkey(self):\r
237         if not self.m2key:\r
238             self.m2key = M2Crypto.EVP.load_key_string(self.as_pem())\r
239         return self.m2key\r
240 \r
241     ##\r
242     # Returns a string containing the public key represented by this object.\r
243 \r
244     def get_pubkey_string(self):\r
245         m2pkey = self.get_m2_pkey()\r
246         return base64.b64encode(m2pkey.as_der())\r
247 \r
248     ##\r
249     # Return an OpenSSL pkey object\r
250 \r
251     def get_openssl_pkey(self):\r
252         return self.key\r
253 \r
254     ##\r
255     # Given another Keypair object, return TRUE if the two keys are the same.\r
256 \r
257     def is_same(self, pkey):\r
258         return self.as_pem() == pkey.as_pem()\r
259 \r
260     def sign_string(self, data):\r
261         k = self.get_m2_pkey()\r
262         k.sign_init()\r
263         k.sign_update(data)\r
264         return base64.b64encode(k.sign_final())\r
265 \r
266     def verify_string(self, data, sig):\r
267         k = self.get_m2_pkey()\r
268         k.verify_init()\r
269         k.verify_update(data)\r
270         return M2Crypto.m2.verify_final(k.ctx, base64.b64decode(sig), k.pkey)\r
271 \r
272     def compute_hash(self, value):\r
273         return self.sign_string(str(value))\r
274 \r
275     # only informative\r
276     def get_filename(self):\r
277         return getattr(self,'filename',None)\r
278 \r
279     def dump (self, *args, **kwargs):\r
280         print self.dump_string(*args, **kwargs)\r
281 \r
282     def dump_string (self):\r
283         result=""\r
284         result += "KEYPAIR: pubkey=%40s..."%self.get_pubkey_string()\r
285         filename=self.get_filename()\r
286         if filename: result += "Filename %s\n"%filename\r
287         return result\r
288 \r
289 ##\r
290 # The certificate class implements a general purpose X509 certificate, making\r
291 # use of the appropriate pyOpenSSL or M2Crypto abstractions. It also adds\r
292 # several addition features, such as the ability to maintain a chain of\r
293 # parent certificates, and storage of application-specific data.\r
294 #\r
295 # Certificates include the ability to maintain a chain of parents. Each\r
296 # certificate includes a pointer to it's parent certificate. When loaded\r
297 # from a file or a string, the parent chain will be automatically loaded.\r
298 # When saving a certificate to a file or a string, the caller can choose\r
299 # whether to save the parent certificates as well.\r
300 \r
301 class Certificate:\r
302     digest = "md5"\r
303 \r
304     cert = None\r
305     issuerKey = None\r
306     issuerSubject = None\r
307     parent = None\r
308     isCA = None # will be a boolean once set\r
309 \r
310     separator="-----parent-----"\r
311 \r
312     ##\r
313     # Create a certificate object.\r
314     #\r
315     # @param lifeDays life of cert in days - default is 1825==5 years\r
316     # @param create If create==True, then also create a blank X509 certificate.\r
317     # @param subject If subject!=None, then create a blank certificate and set\r
318     #     it's subject name.\r
319     # @param string If string!=None, load the certficate from the string.\r
320     # @param filename If filename!=None, load the certficiate from the file.\r
321     # @param isCA If !=None, set whether this cert is for a CA\r
322 \r
323     def __init__(self, lifeDays=1825, create=False, subject=None, string=None, filename=None, isCA=None):\r
324         self.data = {}\r
325         if create or subject:\r
326             self.create(lifeDays)\r
327         if subject:\r
328             self.set_subject(subject)\r
329         if string:\r
330             self.load_from_string(string)\r
331         if filename:\r
332             self.load_from_file(filename)\r
333 \r
334         # Set the CA bit if a value was supplied\r
335         if isCA != None:\r
336             self.set_is_ca(isCA)\r
337 \r
338     # Create a blank X509 certificate and store it in this object.\r
339 \r
340     def create(self, lifeDays=1825):\r
341         self.cert = crypto.X509()\r
342         # FIXME: Use different serial #s\r
343         self.cert.set_serial_number(3)\r
344         self.cert.gmtime_adj_notBefore(0) # 0 means now\r
345         self.cert.gmtime_adj_notAfter(lifeDays*60*60*24) # five years is default\r
346         self.cert.set_version(2) # x509v3 so it can have extensions\r
347 \r
348 \r
349     ##\r
350     # Given a pyOpenSSL X509 object, store that object inside of this\r
351     # certificate object.\r
352 \r
353     def load_from_pyopenssl_x509(self, x509):\r
354         self.cert = x509\r
355 \r
356     ##\r
357     # Load the certificate from a string\r
358 \r
359     def load_from_string(self, string):\r
360         # if it is a chain of multiple certs, then split off the first one and\r
361         # load it (support for the ---parent--- tag as well as normal chained certs)\r
362 \r
363         string = string.strip()\r
364         \r
365         # If it's not in proper PEM format, wrap it\r
366         if string.count('-----BEGIN CERTIFICATE') == 0:\r
367             string = '-----BEGIN CERTIFICATE-----\n%s\n-----END CERTIFICATE-----' % string\r
368 \r
369         # If there is a PEM cert in there, but there is some other text first\r
370         # such as the text of the certificate, skip the text\r
371         beg = string.find('-----BEGIN CERTIFICATE')\r
372         if beg > 0:\r
373             # skipping over non cert beginning                                                                                                              \r
374             string = string[beg:]\r
375 \r
376         parts = []\r
377 \r
378         if string.count('-----BEGIN CERTIFICATE-----') > 1 and \\r
379                string.count(Certificate.separator) == 0:\r
380             parts = string.split('-----END CERTIFICATE-----',1)\r
381             parts[0] += '-----END CERTIFICATE-----'\r
382         else:\r
383             parts = string.split(Certificate.separator, 1)\r
384 \r
385         self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, parts[0])\r
386 \r
387         # if there are more certs, then create a parent and let the parent load\r
388         # itself from the remainder of the string\r
389         if len(parts) > 1 and parts[1] != '':\r
390             self.parent = self.__class__()\r
391             self.parent.load_from_string(parts[1])\r
392 \r
393     ##\r
394     # Load the certificate from a file\r
395 \r
396     def load_from_file(self, filename):\r
397         file = open(filename)\r
398         string = file.read()\r
399         self.load_from_string(string)\r
400         self.filename=filename\r
401 \r
402     ##\r
403     # Save the certificate to a string.\r
404     #\r
405     # @param save_parents If save_parents==True, then also save the parent certificates.\r
406 \r
407     def save_to_string(self, save_parents=True):\r
408         string = crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert)\r
409         if save_parents and self.parent:\r
410             string = string + self.parent.save_to_string(save_parents)\r
411         return string\r
412 \r
413     ##\r
414     # Save the certificate to a file.\r
415     # @param save_parents If save_parents==True, then also save the parent certificates.\r
416 \r
417     def save_to_file(self, filename, save_parents=True, filep=None):\r
418         string = self.save_to_string(save_parents=save_parents)\r
419         if filep:\r
420             f = filep\r
421         else:\r
422             f = open(filename, 'w')\r
423         f.write(string)\r
424         f.close()\r
425         self.filename=filename\r
426 \r
427     ##\r
428     # Save the certificate to a random file in /tmp/\r
429     # @param save_parents If save_parents==True, then also save the parent certificates.\r
430     def save_to_random_tmp_file(self, save_parents=True):\r
431         fp, filename = mkstemp(suffix='cert', text=True)\r
432         fp = os.fdopen(fp, "w")\r
433         self.save_to_file(filename, save_parents=True, filep=fp)\r
434         return filename\r
435 \r
436     ##\r
437     # Sets the issuer private key and name\r
438     # @param key Keypair object containing the private key of the issuer\r
439     # @param subject String containing the name of the issuer\r
440     # @param cert (optional) Certificate object containing the name of the issuer\r
441 \r
442     def set_issuer(self, key, subject=None, cert=None):\r
443         self.issuerKey = key\r
444         if subject:\r
445             # it's a mistake to use subject and cert params at the same time\r
446             assert(not cert)\r
447             if isinstance(subject, dict) or isinstance(subject, str):\r
448                 req = crypto.X509Req()\r
449                 reqSubject = req.get_subject()\r
450                 if (isinstance(subject, dict)):\r
451                     for key in reqSubject.keys():\r
452                         setattr(reqSubject, key, subject[key])\r
453                 else:\r
454                     setattr(reqSubject, "CN", subject)\r
455                 subject = reqSubject\r
456                 # subject is not valid once req is out of scope, so save req\r
457                 self.issuerReq = req\r
458         if cert:\r
459             # if a cert was supplied, then get the subject from the cert\r
460             subject = cert.cert.get_subject()\r
461         assert(subject)\r
462         self.issuerSubject = subject\r
463 \r
464     ##\r
465     # Get the issuer name\r
466 \r
467     def get_issuer(self, which="CN"):\r
468         x = self.cert.get_issuer()\r
469         return getattr(x, which)\r
470 \r
471     ##\r
472     # Set the subject name of the certificate\r
473 \r
474     def set_subject(self, name):\r
475         req = crypto.X509Req()\r
476         subj = req.get_subject()\r
477         if (isinstance(name, dict)):\r
478             for key in name.keys():\r
479                 setattr(subj, key, name[key])\r
480         else:\r
481             setattr(subj, "CN", name)\r
482         self.cert.set_subject(subj)\r
483 \r
484     ##\r
485     # Get the subject name of the certificate\r
486 \r
487     def get_subject(self, which="CN"):\r
488         x = self.cert.get_subject()\r
489         return getattr(x, which)\r
490 \r
491     ##\r
492     # Get a pretty-print subject name of the certificate\r
493 \r
494     def get_printable_subject(self):\r
495         x = self.cert.get_subject()\r
496         return "[ OU: %s, CN: %s, SubjectAltName: %s ]" % (getattr(x, "OU"), getattr(x, "CN"), self.get_data())\r
497 \r
498     ##\r
499     # Get the public key of the certificate.\r
500     #\r
501     # @param key Keypair object containing the public key\r
502 \r
503     def set_pubkey(self, key):\r
504         assert(isinstance(key, Keypair))\r
505         self.cert.set_pubkey(key.get_openssl_pkey())\r
506 \r
507     ##\r
508     # Get the public key of the certificate.\r
509     # It is returned in the form of a Keypair object.\r
510 \r
511     def get_pubkey(self):\r
512         m2x509 = X509.load_cert_string(self.save_to_string())\r
513         pkey = Keypair()\r
514         pkey.key = self.cert.get_pubkey()\r
515         pkey.m2key = m2x509.get_pubkey()\r
516         return pkey\r
517 \r
518     def set_intermediate_ca(self, val):\r
519         return self.set_is_ca(val)\r
520 \r
521     # Set whether this cert is for a CA. All signers and only signers should be CAs.\r
522     # The local member starts unset, letting us check that you only set it once\r
523     # @param val Boolean indicating whether this cert is for a CA\r
524     def set_is_ca(self, val):\r
525         if val is None:\r
526             return\r
527 \r
528         if self.isCA != None:\r
529             # Can't double set properties\r
530             raise "Cannot set basicConstraints CA:?? more than once. Was %s, trying to set as %s" % (self.isCA, val)\r
531 \r
532         self.isCA = val\r
533         if val:\r
534             self.add_extension('basicConstraints', 1, 'CA:TRUE')\r
535         else:\r
536             self.add_extension('basicConstraints', 1, 'CA:FALSE')\r
537 \r
538 \r
539 \r
540     ##\r
541     # Add an X509 extension to the certificate. Add_extension can only be called\r
542     # once for a particular extension name, due to limitations in the underlying\r
543     # library.\r
544     #\r
545     # @param name string containing name of extension\r
546     # @param value string containing value of the extension\r
547 \r
548     def add_extension(self, name, critical, value):\r
549         oldExtVal = None\r
550         try:\r
551             oldExtVal = self.get_extension(name)\r
552         except:\r
553             # M2Crypto LookupError when the extension isn't there (yet)\r
554             pass\r
555 \r
556         # This code limits you from adding the extension with the same value\r
557         # The method comment says you shouldn't do this with the same name\r
558         # But actually it (m2crypto) appears to allow you to do this.\r
559         if oldExtVal and oldExtVal == value:\r
560             # don't add this extension again\r
561             # just do nothing as here\r
562             return\r
563         # FIXME: What if they are trying to set with a different value?\r
564         # Is this ever OK? Or should we raise an exception?\r
565 #        elif oldExtVal:\r
566 #            raise "Cannot add extension %s which had val %s with new val %s" % (name, oldExtVal, value)\r
567 \r
568         ext = crypto.X509Extension (name, critical, value)\r
569         self.cert.add_extensions([ext])\r
570 \r
571     ##\r
572     # Get an X509 extension from the certificate\r
573 \r
574     def get_extension(self, name):\r
575 \r
576         # pyOpenSSL does not have a way to get extensions\r
577         m2x509 = X509.load_cert_string(self.save_to_string())\r
578         value = m2x509.get_ext(name).get_value()\r
579 \r
580         return value\r
581 \r
582     ##\r
583     # Set_data is a wrapper around add_extension. It stores the parameter str in\r
584     # the X509 subject_alt_name extension. Set_data can only be called once, due\r
585     # to limitations in the underlying library.\r
586 \r
587     def set_data(self, str, field='subjectAltName'):\r
588         # pyOpenSSL only allows us to add extensions, so if we try to set the\r
589         # same extension more than once, it will not work\r
590         if self.data.has_key(field):\r
591             raise "Cannot set ", field, " more than once"\r
592         self.data[field] = str\r
593         self.add_extension(field, 0, str)\r
594 \r
595     ##\r
596     # Return the data string that was previously set with set_data\r
597 \r
598     def get_data(self, field='subjectAltName'):\r
599         if self.data.has_key(field):\r
600             return self.data[field]\r
601 \r
602         try:\r
603             uri = self.get_extension(field)\r
604             self.data[field] = uri\r
605         except LookupError:\r
606             return None\r
607 \r
608         return self.data[field]\r
609 \r
610     ##\r
611     # Sign the certificate using the issuer private key and issuer subject previous set with set_issuer().\r
612 \r
613     def sign(self):\r
614         logger.debug('certificate.sign')\r
615         assert self.cert != None\r
616         assert self.issuerSubject != None\r
617         assert self.issuerKey != None\r
618         self.cert.set_issuer(self.issuerSubject)\r
619         self.cert.sign(self.issuerKey.get_openssl_pkey(), self.digest)\r
620 \r
621     ##\r
622     # Verify the authenticity of a certificate.\r
623     # @param pkey is a Keypair object representing a public key. If Pkey\r
624     #     did not sign the certificate, then an exception will be thrown.\r
625 \r
626     def verify(self, pkey):\r
627         # pyOpenSSL does not have a way to verify signatures\r
628         m2x509 = X509.load_cert_string(self.save_to_string())\r
629         m2pkey = pkey.get_m2_pkey()\r
630         # verify it\r
631         return m2x509.verify(m2pkey)\r
632 \r
633         # XXX alternatively, if openssl has been patched, do the much simpler:\r
634         # try:\r
635         #   self.cert.verify(pkey.get_openssl_key())\r
636         #   return 1\r
637         # except:\r
638         #   return 0\r
639 \r
640     ##\r
641     # Return True if pkey is identical to the public key that is contained in the certificate.\r
642     # @param pkey Keypair object\r
643 \r
644     def is_pubkey(self, pkey):\r
645         return self.get_pubkey().is_same(pkey)\r
646 \r
647     ##\r
648     # Given a certificate cert, verify that this certificate was signed by the\r
649     # public key contained in cert. Throw an exception otherwise.\r
650     #\r
651     # @param cert certificate object\r
652 \r
653     def is_signed_by_cert(self, cert):\r
654         k = cert.get_pubkey()\r
655         result = self.verify(k)\r
656         return result\r
657 \r
658     ##\r
659     # Set the parent certficiate.\r
660     #\r
661     # @param p certificate object.\r
662 \r
663     def set_parent(self, p):\r
664         self.parent = p\r
665 \r
666     ##\r
667     # Return the certificate object of the parent of this certificate.\r
668 \r
669     def get_parent(self):\r
670         return self.parent\r
671 \r
672     ##\r
673     # Verification examines a chain of certificates to ensure that each parent\r
674     # signs the child, and that some certificate in the chain is signed by a\r
675     # trusted certificate.\r
676     #\r
677     # Verification is a basic recursion: <pre>\r
678     #     if this_certificate was signed by trusted_certs:\r
679     #         return\r
680     #     else\r
681     #         return verify_chain(parent, trusted_certs)\r
682     # </pre>\r
683     #\r
684     # At each recursion, the parent is tested to ensure that it did sign the\r
685     # child. If a parent did not sign a child, then an exception is thrown. If\r
686     # the bottom of the recursion is reached and the certificate does not match\r
687     # a trusted root, then an exception is thrown.\r
688     # Also require that parents are CAs.\r
689     #\r
690     # @param Trusted_certs is a list of certificates that are trusted.\r
691     #\r
692 \r
693     def verify_chain(self, trusted_certs = None):\r
694         # Verify a chain of certificates. Each certificate must be signed by\r
695         # the public key contained in it's parent. The chain is recursed\r
696         # until a certificate is found that is signed by a trusted root.\r
697 \r
698         # verify expiration time\r
699         if self.cert.has_expired():\r
700             logger.debug("verify_chain: NO, Certificate %s has expired" % self.get_printable_subject())\r
701             raise CertExpired(self.get_printable_subject(), "client cert")\r
702 \r
703         # if this cert is signed by a trusted_cert, then we are set\r
704         for trusted_cert in trusted_certs:\r
705             if self.is_signed_by_cert(trusted_cert):\r
706                 # verify expiration of trusted_cert ?\r
707                 if not trusted_cert.cert.has_expired():\r
708                     logger.debug("verify_chain: YES. Cert %s signed by trusted cert %s"%(\r
709                             self.get_printable_subject(), trusted_cert.get_printable_subject()))\r
710                     return trusted_cert\r
711                 else:\r
712                     logger.debug("verify_chain: NO. Cert %s is signed by trusted_cert %s, but that signer is expired..."%(\r
713                             self.get_printable_subject(),trusted_cert.get_printable_subject()))\r
714                     raise CertExpired(self.get_printable_subject()," signer trusted_cert %s"%trusted_cert.get_printable_subject())\r
715 \r
716         # if there is no parent, then no way to verify the chain\r
717         if not self.parent:\r
718             logger.debug("verify_chain: NO. %s has no parent and issuer %s is not in %d trusted roots"%(self.get_printable_subject(), self.get_issuer(), len(trusted_certs)))\r
719             raise CertMissingParent(self.get_printable_subject() + ": Issuer %s not trusted by any of %d trusted roots, and cert has no parent." % (self.get_issuer(), len(trusted_certs)))\r
720 \r
721         # if it wasn't signed by the parent...\r
722         if not self.is_signed_by_cert(self.parent):\r
723             logger.debug("verify_chain: NO. %s is not signed by parent %s, but by %s"%self.get_printable_subject(), self.parent.get_printable_subject(), self.get_issuer())\r
724             raise CertNotSignedByParent(self.get_printable_subject() + ": Parent %s, issuer %s" % (self.parent.get_printable_subject(), self.get_issuer()))\r
725 \r
726         # Confirm that the parent is a CA. Only CAs can be trusted as\r
727         # signers.\r
728         # Note that trusted roots are not parents, so don't need to be\r
729         # CAs.\r
730         # Ugly - cert objects aren't parsed so we need to read the\r
731         # extension and hope there are no other basicConstraints\r
732         if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):\r
733             logger.warn("verify_chain: cert %s's parent %s is not a CA" % (self.get_printable_subject(), self.parent.get_printable_subject()))\r
734             raise CertNotSignedByParent(self.get_printable_subject() + ": Parent %s not a CA" % self.parent.get_printable_subject())\r
735 \r
736         # if the parent isn't verified...\r
737         logger.debug("verify_chain: .. %s, -> verifying parent %s"%(self.get_printable_subject(),self.parent.get_printable_subject()))\r
738         self.parent.verify_chain(trusted_certs)\r
739 \r
740         return\r
741 \r
742     ### more introspection\r
743     def get_extensions(self):\r
744         # pyOpenSSL does not have a way to get extensions\r
745         triples=[]\r
746         m2x509 = X509.load_cert_string(self.save_to_string())\r
747         nb_extensions=m2x509.get_ext_count()\r
748         logger.debug("X509 had %d extensions"%nb_extensions)\r
749         for i in range(nb_extensions):\r
750             ext=m2x509.get_ext_at(i)\r
751             triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) )\r
752         return triples\r
753 \r
754     def get_data_names(self):\r
755         return self.data.keys()\r
756 \r
757     def get_all_datas (self):\r
758         triples=self.get_extensions()\r
759         for name in self.get_data_names():\r
760             triples.append( (name,self.get_data(name),'data',) )\r
761         return triples\r
762 \r
763     # only informative\r
764     def get_filename(self):\r
765         return getattr(self,'filename',None)\r
766 \r
767     def dump (self, *args, **kwargs):\r
768         print self.dump_string(*args, **kwargs)\r
769 \r
770     def dump_string (self,show_extensions=False):\r
771         result = ""\r
772         result += "CERTIFICATE for %s\n"%self.get_printable_subject()\r
773         result += "Issued by %s\n"%self.get_issuer()\r
774         filename=self.get_filename()\r
775         if filename: result += "Filename %s\n"%filename\r
776         if show_extensions:\r
777             all_datas=self.get_all_datas()\r
778             result += " has %d extensions/data attached"%len(all_datas)\r
779             for (n,v,c) in all_datas:\r
780                 if c=='data':\r
781                     result += "   data: %s=%s\n"%(n,v)\r
782                 else:\r
783                     result += "    ext: %s (crit=%s)=<<<%s>>>\n"%(n,c,v)\r
784         return result\r