autopep8
[sfa.git] / sfa / trust / speaksfor_util.py
index 640d512..12ee8d0 100644 (file)
@@ -42,7 +42,7 @@ from sfa.trust.gid import GID
 from sfa.util.sfalogging import logger
 from sfa.util.py23 import StringIO
 
-# Routine to validate that a speaks-for credential 
+# Routine to validate that a speaks-for credential
 # says what it claims to say:
 # It is a signed credential wherein the signer S is attesting to the
 # ABAC statement:
@@ -54,6 +54,8 @@ from sfa.util.py23 import StringIO
 # Simple XML helper functions
 
 # Find the text associated with first child text node
+
+
 def findTextChildValue(root):
     child = findChildNamed(root, '#text')
     if child:
@@ -61,6 +63,8 @@ def findTextChildValue(root):
     return None
 
 # Find first child with given name
+
+
 def findChildNamed(root, name):
     for child in root.childNodes:
         if child.nodeName == name:
@@ -68,6 +72,8 @@ def findChildNamed(root, name):
     return None
 
 # Write a string to a tempfile, returning name of tempfile
+
+
 def write_to_tempfile(str):
     str_fd, str_file = tempfile.mkstemp()
     if str:
@@ -76,6 +82,8 @@ def write_to_tempfile(str):
     return str_file
 
 # Run a subprocess and return output
+
+
 def run_subprocess(cmd, stdout, stderr):
     try:
         proc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr)
@@ -86,7 +94,9 @@ def run_subprocess(cmd, stdout, stderr):
             output = proc.returncode
         return output
     except Exception as e:
-        raise Exception("Failed call to subprocess '{}': {}".format(" ".join(cmd), e))
+        raise Exception(
+            "Failed call to subprocess '{}': {}".format(" ".join(cmd), e))
+
 
 def get_cert_keyid(gid):
     """Extract the subject key identifier from the given certificate.
@@ -102,6 +112,8 @@ def get_cert_keyid(gid):
     return keyid
 
 # Pull the cert out of a list of certs in a PEM formatted cert string
+
+
 def grab_toplevel_cert(cert):
     start_label = '-----BEGIN CERTIFICATE-----'
     if cert.find(start_label) > -1:
@@ -118,9 +130,9 @@ def grab_toplevel_cert(cert):
 # Validate that the given speaks-for credential represents the
 # statement User.speaks_for(User)<-Tool for the given user and tool certs
 # and was signed by the user
-# Return: 
-#   Boolean indicating whether the given credential 
-#      is not expired 
+# Return:
+#   Boolean indicating whether the given credential
+#      is not expired
 #      is an ABAC credential
 #      was signed by the user associated with the speaking_for_urn
 #      is verified by xmlsec1
@@ -130,6 +142,8 @@ def grab_toplevel_cert(cert):
 #   String user certificate of speaking_for user if the above tests succeed
 #      (None otherwise)
 #   Error message indicating why the speaks_for call failed ("" otherwise)
+
+
 def verify_speaks_for(cred, tool_gid, speaking_for_urn,
                       trusted_roots, schema=None, logger=None):
 
@@ -154,7 +168,7 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
             .format(user_urn, speaking_for_urn, cred.pretty_cred())
 
     tails = cred.get_tails()
-    if len(tails) != 1: 
+    if len(tails) != 1:
         return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got {} ({})"\
             .format(len(tails), cred.pretty_cred())
 
@@ -172,11 +186,12 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
     if trusted_roots:
         for x in trusted_roots:
             cert_args += ['--trusted-pem', x.filename]
-    # FIXME: Why do we not need to specify the --node-id option as credential.py does?
+    # FIXME: Why do we not need to specify the --node-id option as
+    # credential.py does?
     xmlsec1 = cred.get_xmlsec1_path()
     if not xmlsec1:
         raise Exception("Could not locate required 'xmlsec1' program")
-    xmlsec1_args = [xmlsec1, '--verify'] + cert_args + [ cred_file]
+    xmlsec1_args = [xmlsec1, '--verify'] + cert_args + [cred_file]
     output = run_subprocess(xmlsec1_args, stdout=None, stderr=subprocess.PIPE)
     os.unlink(cred_file)
     if output != 0:
@@ -208,7 +223,8 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
         xmlschema = etree.XMLSchema(schema_doc)
         if not xmlschema.validate(tree):
             error = xmlschema.error_log.last_error
-            message = "{}: {} (line {})".format(cred.pretty_cred(), error.message, error.line)
+            message = "{}: {} (line {})".format(
+                cred.pretty_cred(), error.message, error.line)
             return False, None, ("XML Credential schema invalid: {}".format(message))
 
     if trusted_roots:
@@ -233,20 +249,23 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
 #
 # credentials is a list of GENI-style credentials:
 #  Either a cred string xml string, or Credential object of a tuple
-#    [{'geni_type' : geni_type, 'geni_value : cred_value, 
+#    [{'geni_type' : geni_type, 'geni_value : cred_value,
 #      'geni_version' : version}]
 # caller_gid is the raw X509 cert gid
 # options is the dictionary of API-provided options
 # trusted_roots is a list of Certificate objects from the system
 #   trusted_root directory
 # Optionally, provide an XML schema against which to validate the credential
+
+
 def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trusted_roots, schema=None):
     if speaking_for_xrn:
-        speaking_for_urn = Xrn (speaking_for_xrn.strip()).get_urn()
+        speaking_for_urn = Xrn(speaking_for_xrn.strip()).get_urn()
         for cred in credentials:
             # Skip things that aren't ABAC credentials
             if type(cred) == dict:
-                if cred['geni_type'] != ABACCredential.ABAC_CREDENTIAL_TYPE: continue
+                if cred['geni_type'] != ABACCredential.ABAC_CREDENTIAL_TYPE:
+                    continue
                 cred_value = cred['geni_value']
             elif isinstance(cred, Credential):
                 if not isinstance(cred, ABACCredential):
@@ -254,7 +273,8 @@ def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trus
                 else:
                     cred_value = cred
             else:
-                if CredentialFactory.getType(cred) != ABACCredential.ABAC_CREDENTIAL_TYPE: continue
+                if CredentialFactory.getType(cred) != ABACCredential.ABAC_CREDENTIAL_TYPE:
+                    continue
                 cred_value = cred
 
             # If the cred_value is xml, create the object
@@ -271,13 +291,16 @@ def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trus
                                   trusted_roots, schema, logger=logger)
             logger.info(msg)
             if is_valid_speaks_for:
-                return user_gid # speaks-for
+                return user_gid  # speaks-for
             else:
                 logger.info("Got speaks-for option but not a valid speaks_for with this credential: {}"
                             .format(msg))
-    return caller_gid # Not speaks-for
+    return caller_gid  # Not speaks-for
+
+# Create an ABAC Speaks For credential using the ABACCredential object and
+# it's encode&sign methods
+
 
-# Create an ABAC Speaks For credential using the ABACCredential object and it's encode&sign methods
 def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filename, dur_days=365):
     logger.info("Creating ABAC SpeaksFor using ABACCredential...\n")
     # Write out the user cert
@@ -298,9 +321,11 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam
     user_urn = user_gid.get_urn()
     user_keyid = get_cert_keyid(user_gid)
     tool_keyid = get_cert_keyid(tool_gid)
-    cred.head = ABACElement(user_keyid, user_urn, "speaks_for_{}".format(user_keyid))
+    cred.head = ABACElement(user_keyid, user_urn,
+                            "speaks_for_{}".format(user_keyid))
     cred.tails.append(ABACElement(tool_keyid, tool_urn))
-    cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(days=dur_days))
+    cred.set_expiration(datetime.datetime.utcnow() +
+                        datetime.timedelta(days=dur_days))
     cred.expiration = cred.expiration.replace(microsecond=0)
 
     # Produce the cred XML
@@ -314,7 +339,9 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam
                 .format(cred.pretty_cred(), cred_filename))
 
 # FIXME: Assumes signer is itself signed by an 'ma_gid' that can be trusted
-def create_speaks_for(tool_gid, user_gid, ma_gid, 
+
+
+def create_speaks_for(tool_gid, user_gid, ma_gid,
                       user_key_file, cred_filename, dur_days=365):
     tool_urn = tool_gid.get_urn()
     user_urn = user_gid.get_urn()
@@ -363,14 +390,14 @@ def create_speaks_for(tool_gid, user_gid, ma_gid,
     unsigned_cred_filename = write_to_tempfile(unsigned_cred)
 
     # Now sign the file with xmlsec1
-    # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem 
+    # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem
     # --output signed.xml tosign.xml
     pems = "{},{},{}".format(user_key_file, user_gid.get_filename(),
                              ma_gid.get_filename())
     xmlsec1 = Credential.get_xmlsec1_path()
     if not xmlsec1:
         raise Exception("Could not locate required 'xmlsec1' program")
-    cmd = [ xmlsec1,  '--sign',  '--privkey-pem', pems, 
+    cmd = [xmlsec1,  '--sign',  '--privkey-pem', pems,
            '--output', cred_filename, unsigned_cred_filename]
 
 #    print(" ".join(cmd))
@@ -387,19 +414,19 @@ def create_speaks_for(tool_gid, user_gid, ma_gid,
 if __name__ == "__main__":
 
     parser = optparse.OptionParser()
-    parser.add_option('--cred_file', 
+    parser.add_option('--cred_file',
                       help='Name of credential file')
-    parser.add_option('--tool_cert_file', 
+    parser.add_option('--tool_cert_file',
                       help='Name of file containing tool certificate')
-    parser.add_option('--user_urn', 
+    parser.add_option('--user_urn',
                       help='URN of speaks-for user')
-    parser.add_option('--user_cert_file', 
+    parser.add_option('--user_cert_file',
                       help="filename of x509 certificate of signing user")
-    parser.add_option('--ma_cert_file', 
+    parser.add_option('--ma_cert_file',
                       help="filename of x509 cert of MA that signed user cert")
-    parser.add_option('--user_key_file', 
+    parser.add_option('--user_key_file',
                       help="filename of private key of signing user")
-    parser.add_option('--trusted_roots_directory', 
+    parser.add_option('--trusted_roots_directory',
                       help='Directory of trusted root certs')
     parser.add_option('--create',
                       help="name of file of ABAC speaksfor cred to create")
@@ -412,7 +439,7 @@ if __name__ == "__main__":
 
     if options.create:
         if options.user_cert_file and options.user_key_file \
-            and options.ma_cert_file:
+                and options.ma_cert_file:
             user_gid = GID(filename=options.user_cert_file)
             ma_gid = GID(filename=options.ma_cert_file)
             if options.useObject:
@@ -424,8 +451,8 @@ if __name__ == "__main__":
                                   options.user_key_file,
                                   options.create)
         else:
-            print("Usage: --create cred_file " + 
-                  "--user_cert_file user_cert_file" + 
+            print("Usage: --create cred_file " +
+                  "--user_cert_file user_cert_file" +
                   " --user_key_file user_key_file --ma_cert_file ma_cert_file")
         sys.exit()
 
@@ -437,18 +464,17 @@ if __name__ == "__main__":
 
     trusted_roots_directory = options.trusted_roots_directory
     trusted_roots = \
-        [Certificate(filename=os.path.join(trusted_roots_directory, file)) 
-             for file in os.listdir(trusted_roots_directory)
-             if file.endswith('.pem') and file != 'CATedCACerts.pem']
+        [Certificate(filename=os.path.join(trusted_roots_directory, file))
+         for file in os.listdir(trusted_roots_directory)
+         if file.endswith('.pem') and file != 'CATedCACerts.pem']
 
     cred = open(options.cred_file).read()
 
-    creds = [{'geni_type' : ABACCredential.ABAC_CREDENTIAL_TYPE, 'geni_value' : cred, 
-              'geni_version' : '1'}]
+    creds = [{'geni_type': ABACCredential.ABAC_CREDENTIAL_TYPE, 'geni_value': cred,
+              'geni_version': '1'}]
     gid = determine_speaks_for(None, creds, tool_gid,
-                               {'geni_speaking_for' : user_urn},
+                               {'geni_speaking_for': user_urn},
                                trusted_roots)
 
-
     print('SPEAKS_FOR = {}'.format(gid != tool_gid))
     print("CERT URN = {}".format(gid.get_urn()))