X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=PLC%2FGPG.py;h=1dcc0cf4db90d5cf384462f28e28ade13d38d742;hb=19d4a01ccf66af9e00914351b3eacd5fc880f988;hp=1652b94a594d6c007e91b233bf9cf3934cb7f2e0;hpb=c4d78c10c1d8134de2e5625832ebddcf63fbb2db;p=plcapi.git diff --git a/PLC/GPG.py b/PLC/GPG.py index 1652b94..1dcc0cf 100644 --- a/PLC/GPG.py +++ b/PLC/GPG.py @@ -7,36 +7,34 @@ # Mark Huang # Copyright (C) 2006 The Trustees of Princeton University # -# $Id: GPG.py,v 1.3 2007/01/08 18:11:54 mlhuang Exp $ -# +import os import xmlrpclib import shutil +from types import StringTypes from StringIO import StringIO -from xml.dom import minidom -from xml.dom.ext import Canonicalize from subprocess import Popen, PIPE, call from tempfile import NamedTemporaryFile, mkdtemp +from lxml import etree from PLC.Faults import * -def canonicalize(methodname, args): +def canonicalize(args, methodname = None, methodresponse = False): """ - Returns a canonicalized XML-RPC representation of the - specified method call. + Returns a canonicalized XML-RPC representation of the specified + method call (methodname != None) or response (methodresponse = + True). """ - xml = xmlrpclib.dumps(args, methodname, encoding = 'utf-8', allow_none = 1) - dom = minidom.parseString(xml) - + xml = xmlrpclib.dumps(args, methodname, methodresponse, encoding = 'utf-8', allow_none = 1) + dom = etree.fromstring(xml) + canonical=etree.tostring(dom) + # pre-f20 version was using Canonicalize from PyXML + # from xml.dom.ext import Canonicalize # Canonicalize(), though it claims to, does not encode unicode # nodes to UTF-8 properly and throws an exception unless you write # the stream to a file object, so just encode it ourselves. - buf = StringIO() - Canonicalize(dom, output = buf) - xml = buf.getvalue().encode('utf-8') - - return xml + return canonical.encode('utf-8') def gpg_export(keyring, armor = True): """ @@ -52,7 +50,7 @@ def gpg_export(keyring, armor = True): if armor: args.append("--armor") - p = Popen(args, stdin = PIPE, stdout = PIPE, stderr = PIPE) + p = Popen(args, stdin = PIPE, stdout = PIPE, stderr = PIPE, close_fds = True) export = p.stdout.read() err = p.stderr.read() rc = p.wait() @@ -65,21 +63,40 @@ def gpg_export(keyring, armor = True): return export -def gpg_sign(methodname, args, secret_keyring, keyring): +def gpg_sign(args, secret_keyring, keyring, methodname = None, methodresponse = False, detach_sign = True): """ - Signs the specified method call using the specified keyring files. + Signs the specified method call (methodname != None) or response + (methodresponse == True) using the specified GPG keyring files. If + args is not a tuple representing the arguments to the method call + or the method response value, then it should be a string + representing a generic message to sign (detach_sign == True) or + sign/encrypt (detach_sign == False) specified). Returns the + detached signature (detach_sign == True) or signed/encrypted + message (detach_sign == False). """ - message = canonicalize(methodname, args) + # Accept either an opaque string blob or a Python tuple + if isinstance(args, StringTypes): + message = args + elif isinstance(args, tuple): + message = canonicalize(args, methodname, methodresponse) + # Use temporary trustdb homedir = mkdtemp() - p = Popen(["gpg", "--batch", "--no-tty", - "--homedir", homedir, - "--no-default-keyring", - "--secret-keyring", secret_keyring, - "--keyring", keyring, - "--detach-sign", "--armor"], - stdin = PIPE, stdout = PIPE, stderr = PIPE) + + cmd = ["gpg", "--batch", "--no-tty", + "--homedir", homedir, + "--no-default-keyring", + "--secret-keyring", secret_keyring, + "--keyring", keyring, + "--armor"] + + if detach_sign: + cmd.append("--detach-sign") + else: + cmd.append("--sign") + + p = Popen(cmd, stdin = PIPE, stdout = PIPE, stderr = PIPE) p.stdin.write(message) p.stdin.close() signature = p.stdout.read() @@ -94,40 +111,68 @@ def gpg_sign(methodname, args, secret_keyring, keyring): return signature -def gpg_verify(methodname, args, signature, key): +def gpg_verify(args, key, signature = None, methodname = None, methodresponse = False): """ - Verifys the signature of the method call using the specified public - key material. + Verifies the signature of the specified method call (methodname != + None) or response (methodresponse = True) using the specified + public key material. If args is not a tuple representing the + arguments to the method call or the method response value, then it + should be a string representing a generic message to verify (if + signature is specified) or verify/decrypt (if signature is not + specified). """ - message = canonicalize(methodname, args) + # Accept either an opaque string blob or a Python tuple + if isinstance(args, StringTypes): + message = args + else: + message = canonicalize(args, methodname, methodresponse) # Write public key to temporary file - keyfile = NamedTemporaryFile(suffix = '.pub') - keyfile.write(key) - keyfile.flush() + if os.path.exists(key): + keyfile = None + keyfilename = key + else: + keyfile = NamedTemporaryFile(suffix = '.pub') + keyfile.write(key) + keyfile.flush() + keyfilename = keyfile.name # Import public key into temporary keyring homedir = mkdtemp() - call(["gpg", "--batch", "--no-tty", "--homedir", homedir, "--import", keyfile.name], + call(["gpg", "--batch", "--no-tty", "--homedir", homedir, "--import", keyfilename], stdin = PIPE, stdout = PIPE, stderr = PIPE) - # Write detached signature to temporary file - sigfile = NamedTemporaryFile() - sigfile.write(signature) - sigfile.flush() - - # Verify signature - p = Popen(["gpg", "--batch", "--no-tty", "--homedir", homedir, "--verify", sigfile.name, "-"], - stdin = PIPE, stdout = PIPE, stderr = PIPE) + cmd = ["gpg", "--batch", "--no-tty", + "--homedir", homedir] + + if signature is not None: + # Write detached signature to temporary file + sigfile = NamedTemporaryFile() + sigfile.write(signature) + sigfile.flush() + cmd += ["--verify", sigfile.name, "-"] + else: + # Implicit signature + sigfile = None + cmd.append("--decrypt") + + p = Popen(cmd, stdin = PIPE, stdout = PIPE, stderr = PIPE) p.stdin.write(message) p.stdin.close() + if signature is None: + message = p.stdout.read() + err = p.stderr.read() rc = p.wait() # Clean up - sigfile.close() shutil.rmtree(homedir) - keyfile.close() + if sigfile: + sigfile.close() + if keyfile: + keyfile.close() if rc: - raise PLCAuthenticationFailure, "GPG verification failed with return code %d" % rc + raise PLCAuthenticationFailure, "GPG verification failed with return code %d: %s" % (rc, err) + + return message