1 #----------------------------------------------------------------------
2 # Copyright (c) 2014 Raytheon BBN Technologies
4 # Permission is hereby granted, free of charge, to any person obtaining
5 # a copy of this software and/or hardware specification (the "Work") to
6 # deal in the Work without restriction, including without limitation the
7 # rights to use, copy, modify, merge, publish, distribute, sublicense,
8 # and/or sell copies of the Work, and to permit persons to whom the Work
9 # is furnished to do so, subject to the following conditions:
11 # The above copyright notice and this permission notice shall be
12 # included in all copies or substantial portions of the Work.
14 # THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
22 #----------------------------------------------------------------------
24 from __future__ import print_function
27 from dateutil import parser as du_parser, tz as du_tz
33 from xml.dom.minidom import *
35 from sfa.util.sfatime import SFATIME_FORMAT
37 from sfa.trust.certificate import Certificate
38 from sfa.trust.credential import Credential, signature_format, HAVELXML
39 from sfa.trust.abac_credential import ABACCredential, ABACElement
40 from sfa.trust.credential_factory import CredentialFactory
41 from sfa.trust.gid import GID
42 from sfa.util.sfalogging import logger
43 from sfa.util.py23 import StringIO
45 # Routine to validate that a speaks-for credential
46 # says what it claims to say:
47 # It is a signed credential wherein the signer S is attesting to the
49 # S.speaks_for(S)<-T Or "S says that T speaks for S"
51 # Requires that openssl be installed and in the path
52 # create_speaks_for requires that xmlsec1 be on the path
54 # Simple XML helper functions
56 # Find the text associated with first child text node
57 def findTextChildValue(root):
58 child = findChildNamed(root, '#text')
60 return str(child.nodeValue)
63 # Find first child with given name
64 def findChildNamed(root, name):
65 for child in root.childNodes:
66 if child.nodeName == name:
70 # Write a string to a tempfile, returning name of tempfile
71 def write_to_tempfile(str):
72 str_fd, str_file = tempfile.mkstemp()
78 # Run a subprocess and return output
79 def run_subprocess(cmd, stdout, stderr):
81 proc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr)
84 output = proc.stdout.read()
86 output = proc.returncode
88 except Exception as e:
89 raise Exception("Failed call to subprocess '{}': {}".format(" ".join(cmd), e))
91 def get_cert_keyid(gid):
92 """Extract the subject key identifier from the given certificate.
93 Return they key id as lowercase string with no colon separators
94 between pairs. The key id as shown in the text output of a
95 certificate are in uppercase with colon separators.
98 raw_key_id = gid.get_extension('subjectKeyIdentifier')
99 # Raw has colons separating pairs, and all characters are upper case.
100 # Remove the colons and convert to lower case.
101 keyid = raw_key_id.replace(':', '').lower()
104 # Pull the cert out of a list of certs in a PEM formatted cert string
105 def grab_toplevel_cert(cert):
106 start_label = '-----BEGIN CERTIFICATE-----'
107 if cert.find(start_label) > -1:
108 start_index = cert.find(start_label) + len(start_label)
111 end_label = '-----END CERTIFICATE-----'
112 end_index = cert.find(end_label)
113 first_cert = cert[start_index:end_index]
114 pieces = first_cert.split('\n')
115 first_cert = "".join(pieces)
118 # Validate that the given speaks-for credential represents the
119 # statement User.speaks_for(User)<-Tool for the given user and tool certs
120 # and was signed by the user
122 # Boolean indicating whether the given credential
124 # is an ABAC credential
125 # was signed by the user associated with the speaking_for_urn
126 # is verified by xmlsec1
127 # asserts U.speaks_for(U)<-T ("user says that T may speak for user")
128 # If schema provided, validate against schema
129 # is trusted by given set of trusted roots (both user cert and tool cert)
130 # String user certificate of speaking_for user if the above tests succeed
132 # Error message indicating why the speaks_for call failed ("" otherwise)
133 def verify_speaks_for(cred, tool_gid, speaking_for_urn,
134 trusted_roots, schema=None, logger=None):
136 # Credential has not expired
137 if cred.expiration and cred.expiration < datetime.datetime.utcnow():
138 return False, None, "ABAC Credential expired at {} ({})"\
139 .format(cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred())
142 if cred.get_cred_type() != ABACCredential.ABAC_CREDENTIAL_TYPE:
143 return False, None, "Credential not of type ABAC but {}".format(cred.get_cred_type)
145 if cred.signature is None or cred.signature.gid is None:
146 return False, None, "Credential malformed: missing signature or signer cert. Cred: {}"\
147 .format(cred.pretty_cred())
148 user_gid = cred.signature.gid
149 user_urn = user_gid.get_urn()
151 # URN of signer from cert must match URN of 'speaking-for' argument
152 if user_urn != speaking_for_urn:
153 return False, None, "User URN from cred doesn't match speaking_for URN: {} != {} (cred {})"\
154 .format(user_urn, speaking_for_urn, cred.pretty_cred())
156 tails = cred.get_tails()
158 return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got {} ({})"\
159 .format(len(tails), cred.pretty_cred())
161 user_keyid = get_cert_keyid(user_gid)
162 tool_keyid = get_cert_keyid(tool_gid)
163 subject_keyid = tails[0].get_principal_keyid()
165 head = cred.get_head()
166 principal_keyid = head.get_principal_keyid()
167 role = head.get_role()
169 # Credential must pass xmlsec1 verify
170 cred_file = write_to_tempfile(cred.save_to_string())
173 for x in trusted_roots:
174 cert_args += ['--trusted-pem', x.filename]
175 # FIXME: Why do we not need to specify the --node-id option as credential.py does?
176 xmlsec1 = cred.get_xmlsec1_path()
178 raise Exception("Could not locate required 'xmlsec1' program")
179 xmlsec1_args = [xmlsec1, '--verify'] + cert_args + [ cred_file]
180 output = run_subprocess(xmlsec1_args, stdout=None, stderr=subprocess.PIPE)
184 # xmlsec errors have a msg= which is the interesting bit.
185 # But does this go to stderr or stdout? Do we have it here?
186 mstart = verified.find("msg=")
188 if mstart > -1 and len(verified) > 4:
190 mend = verified.find('\\', mstart)
191 msg = verified[mstart:mend]
194 return False, None, "ABAC credential failed to xmlsec1 verify: {}".format(msg)
196 # Must say U.speaks_for(U)<-T
197 if user_keyid != principal_keyid or \
198 tool_keyid != subject_keyid or \
199 role != ('speaks_for_{}'.format(user_keyid)):
200 return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T ({})"\
201 .format(cred.pretty_cred())
203 # If schema provided, validate against schema
204 if HAVELXML and schema and os.path.exists(schema):
205 from lxml import etree
206 tree = etree.parse(StringIO(cred.xml))
207 schema_doc = etree.parse(schema)
208 xmlschema = etree.XMLSchema(schema_doc)
209 if not xmlschema.validate(tree):
210 error = xmlschema.error_log.last_error
211 message = "{}: {} (line {})".format(cred.pretty_cred(), error.message, error.line)
212 return False, None, ("XML Credential schema invalid: {}".format(message))
215 # User certificate must validate against trusted roots
217 user_gid.verify_chain(trusted_roots)
218 except Exception as e:
219 return False, None, \
220 "Cred signer (user) cert not trusted: {}".format(e)
222 # Tool certificate must validate against trusted roots
224 tool_gid.verify_chain(trusted_roots)
225 except Exception as e:
226 return False, None, "Tool cert not trusted: {}".format(e)
228 return True, user_gid, ""
230 # Determine if this is a speaks-for context. If so, validate
231 # And return either the tool_cert (not speaks-for or not validated)
232 # or the user cert (validated speaks-for)
234 # credentials is a list of GENI-style credentials:
235 # Either a cred string xml string, or Credential object of a tuple
236 # [{'geni_type' : geni_type, 'geni_value : cred_value,
237 # 'geni_version' : version}]
238 # caller_gid is the raw X509 cert gid
239 # options is the dictionary of API-provided options
240 # trusted_roots is a list of Certificate objects from the system
241 # trusted_root directory
242 # Optionally, provide an XML schema against which to validate the credential
243 def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trusted_roots, schema=None):
245 speaking_for_urn = Xrn (speaking_for_xrn.strip()).get_urn()
246 for cred in credentials:
247 # Skip things that aren't ABAC credentials
248 if type(cred) == dict:
249 if cred['geni_type'] != ABACCredential.ABAC_CREDENTIAL_TYPE: continue
250 cred_value = cred['geni_value']
251 elif isinstance(cred, Credential):
252 if not isinstance(cred, ABACCredential):
257 if CredentialFactory.getType(cred) != ABACCredential.ABAC_CREDENTIAL_TYPE: continue
260 # If the cred_value is xml, create the object
261 if not isinstance(cred_value, ABACCredential):
262 cred = CredentialFactory.createCred(cred_value)
264 # print("Got a cred to check speaksfor for: {}".format(cred.pretty_cred()))
265 # #cred.dump(True, True)
266 # print("Caller: {}".format(caller_gid.dump_string(2, True)))
267 # See if this is a valid speaks_for
268 is_valid_speaks_for, user_gid, msg = \
269 verify_speaks_for(cred,
270 caller_gid, speaking_for_urn,
271 trusted_roots, schema, logger=logger)
273 if is_valid_speaks_for:
274 return user_gid # speaks-for
276 logger.info("Got speaks-for option but not a valid speaks_for with this credential: {}"
278 return caller_gid # Not speaks-for
280 # Create an ABAC Speaks For credential using the ABACCredential object and it's encode&sign methods
281 def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filename, dur_days=365):
282 logger.info("Creating ABAC SpeaksFor using ABACCredential...\n")
283 # Write out the user cert
284 from tempfile import mkstemp
285 ma_str = ma_gid.save_to_string()
286 user_cert_str = user_gid.save_to_string()
287 if not user_cert_str.endswith(ma_str):
288 user_cert_str += ma_str
289 fp, user_cert_filename = mkstemp(suffix='cred', text=True)
290 fp = os.fdopen(fp, "w")
291 fp.write(user_cert_str)
295 cred = ABACCredential()
296 cred.set_issuer_keys(user_key_file, user_cert_filename)
297 tool_urn = tool_gid.get_urn()
298 user_urn = user_gid.get_urn()
299 user_keyid = get_cert_keyid(user_gid)
300 tool_keyid = get_cert_keyid(tool_gid)
301 cred.head = ABACElement(user_keyid, user_urn, "speaks_for_{}".format(user_keyid))
302 cred.tails.append(ABACElement(tool_keyid, tool_urn))
303 cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(days=dur_days))
304 cred.expiration = cred.expiration.replace(microsecond=0)
306 # Produce the cred XML
312 cred.save_to_file(cred_filename)
313 logger.info("Created ABAC credential: '{}' in file {}"
314 .format(cred.pretty_cred(), cred_filename))
316 # FIXME: Assumes signer is itself signed by an 'ma_gid' that can be trusted
317 def create_speaks_for(tool_gid, user_gid, ma_gid,
318 user_key_file, cred_filename, dur_days=365):
319 tool_urn = tool_gid.get_urn()
320 user_urn = user_gid.get_urn()
324 credential_format = """\
325 <?xml version="1.0" encoding="UTF-8"?>
326 <signed-credential xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://www.geni.net/resources/credential/2/credential.xsd" xsi:schemaLocation="http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd">
327 <credential xml:id="{refid}">
335 <expires>{expiration_str}</expires>
338 <version>{version}</version>
340 <ABACprincipal><keyid>{user_keyid}</keyid><mnemonic>{user_urn}</mnemonic></ABACprincipal>
341 <role>speaks_for_{user_keyid}</role>
344 <ABACprincipal><keyid>{tool_keyid}/keyid><mnemonic>{tool_urn}</mnemonic></ABACprincipal>
349 <signatures>""" + signature_format + """\
351 </signed-credential>\
354 credential_duration = datetime.timedelta(days=dur_days)
355 expiration = datetime.datetime.utcnow() + credential_duration
356 expiration_str = expiration.strftime(SFATIME_FORMAT)
359 user_keyid = get_cert_keyid(user_gid)
360 tool_keyid = get_cert_keyid(tool_gid)
361 # apply the format - itself uses signature_format which uses 'refid'
362 unsigned_cred = credential_format.format(**locals())
363 unsigned_cred_filename = write_to_tempfile(unsigned_cred)
365 # Now sign the file with xmlsec1
366 # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem
367 # --output signed.xml tosign.xml
368 pems = "{},{},{}".format(user_key_file, user_gid.get_filename(),
369 ma_gid.get_filename())
370 xmlsec1 = Credential.get_xmlsec1_path()
372 raise Exception("Could not locate required 'xmlsec1' program")
373 cmd = [ xmlsec1, '--sign', '--privkey-pem', pems,
374 '--output', cred_filename, unsigned_cred_filename]
376 # print(" ".join(cmd))
377 sign_proc_output = run_subprocess(cmd, stdout=subprocess.PIPE, stderr=None)
378 if sign_proc_output == None:
379 logger.info("xmlsec1 returns empty output")
381 logger.info("Created ABAC credential: '{} speaks_for {}' in file {}"
382 .format(tool_urn, user_urn, cred_filename))
383 os.unlink(unsigned_cred_filename)
387 if __name__ == "__main__":
389 parser = optparse.OptionParser()
390 parser.add_option('--cred_file',
391 help='Name of credential file')
392 parser.add_option('--tool_cert_file',
393 help='Name of file containing tool certificate')
394 parser.add_option('--user_urn',
395 help='URN of speaks-for user')
396 parser.add_option('--user_cert_file',
397 help="filename of x509 certificate of signing user")
398 parser.add_option('--ma_cert_file',
399 help="filename of x509 cert of MA that signed user cert")
400 parser.add_option('--user_key_file',
401 help="filename of private key of signing user")
402 parser.add_option('--trusted_roots_directory',
403 help='Directory of trusted root certs')
404 parser.add_option('--create',
405 help="name of file of ABAC speaksfor cred to create")
406 parser.add_option('--useObject', action='store_true', default=False,
407 help='Use the ABACCredential object to create the credential (default False)')
409 options, args = parser.parse_args(sys.argv)
411 tool_gid = GID(filename=options.tool_cert_file)
414 if options.user_cert_file and options.user_key_file \
415 and options.ma_cert_file:
416 user_gid = GID(filename=options.user_cert_file)
417 ma_gid = GID(filename=options.ma_cert_file)
418 if options.useObject:
419 create_sign_abaccred(tool_gid, user_gid, ma_gid,
420 options.user_key_file,
423 create_speaks_for(tool_gid, user_gid, ma_gid,
424 options.user_key_file,
427 print("Usage: --create cred_file " +
428 "--user_cert_file user_cert_file" +
429 " --user_key_file user_key_file --ma_cert_file ma_cert_file")
432 user_urn = options.user_urn
434 # Get list of trusted rootcerts
435 if options.cred_file and not options.trusted_roots_directory:
436 sys.exit("Must supply --trusted_roots_directory to validate a credential")
438 trusted_roots_directory = options.trusted_roots_directory
440 [Certificate(filename=os.path.join(trusted_roots_directory, file))
441 for file in os.listdir(trusted_roots_directory)
442 if file.endswith('.pem') and file != 'CATedCACerts.pem']
444 cred = open(options.cred_file).read()
446 creds = [{'geni_type' : ABACCredential.ABAC_CREDENTIAL_TYPE, 'geni_value' : cred,
447 'geni_version' : '1'}]
448 gid = determine_speaks_for(None, creds, tool_gid,
449 {'geni_speaking_for' : user_urn},
453 print('SPEAKS_FOR = {}'.format(gid != tool_gid))
454 print("CERT URN = {}".format(gid.get_urn()))