1 #----------------------------------------------------------------------
2 # Copyright (c) 2014 Raytheon BBN Technologies
4 # Permission is hereby granted, free of charge, to any person obtaining
5 # a copy of this software and/or hardware specification (the "Work") to
6 # deal in the Work without restriction, including without limitation the
7 # rights to use, copy, modify, merge, publish, distribute, sublicense,
8 # and/or sell copies of the Work, and to permit persons to whom the Work
9 # is furnished to do so, subject to the following conditions:
11 # The above copyright notice and this permission notice shall be
12 # included in all copies or substantial portions of the Work.
14 # THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
22 #----------------------------------------------------------------------
24 from __future__ import print_function
27 from dateutil import parser as du_parser, tz as du_tz
33 from xml.dom.minidom import *
35 from sfa.util.sfatime import SFATIME_FORMAT
37 from sfa.trust.certificate import Certificate
38 from sfa.trust.credential import Credential, signature_format, HAVELXML
39 from sfa.trust.abac_credential import ABACCredential, ABACElement
40 from sfa.trust.credential_factory import CredentialFactory
41 from sfa.trust.gid import GID
42 from sfa.util.sfalogging import logger
43 from sfa.util.py23 import StringIO
45 # Routine to validate that a speaks-for credential
46 # says what it claims to say:
47 # It is a signed credential wherein the signer S is attesting to the
49 # S.speaks_for(S)<-T Or "S says that T speaks for S"
51 # Requires that openssl be installed and in the path
52 # create_speaks_for requires that xmlsec1 be on the path
54 # Simple XML helper functions
56 # Find the text associated with first child text node
59 def findTextChildValue(root):
60 child = findChildNamed(root, '#text')
62 return str(child.nodeValue)
65 # Find first child with given name
68 def findChildNamed(root, name):
69 for child in root.childNodes:
70 if child.nodeName == name:
74 # Write a string to a tempfile, returning name of tempfile
77 def write_to_tempfile(str):
78 str_fd, str_file = tempfile.mkstemp()
84 # Run a subprocess and return output
87 def run_subprocess(cmd, stdout, stderr):
89 proc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr)
92 output = proc.stdout.read()
94 output = proc.returncode
96 except Exception as e:
98 "Failed call to subprocess '{}': {}".format(" ".join(cmd), e))
101 def get_cert_keyid(gid):
102 """Extract the subject key identifier from the given certificate.
103 Return they key id as lowercase string with no colon separators
104 between pairs. The key id as shown in the text output of a
105 certificate are in uppercase with colon separators.
108 raw_key_id = gid.get_extension('subjectKeyIdentifier')
109 # Raw has colons separating pairs, and all characters are upper case.
110 # Remove the colons and convert to lower case.
111 keyid = raw_key_id.replace(':', '').lower()
114 # Pull the cert out of a list of certs in a PEM formatted cert string
117 def grab_toplevel_cert(cert):
118 start_label = '-----BEGIN CERTIFICATE-----'
119 if cert.find(start_label) > -1:
120 start_index = cert.find(start_label) + len(start_label)
123 end_label = '-----END CERTIFICATE-----'
124 end_index = cert.find(end_label)
125 first_cert = cert[start_index:end_index]
126 pieces = first_cert.split('\n')
127 first_cert = "".join(pieces)
130 # Validate that the given speaks-for credential represents the
131 # statement User.speaks_for(User)<-Tool for the given user and tool certs
132 # and was signed by the user
134 # Boolean indicating whether the given credential
136 # is an ABAC credential
137 # was signed by the user associated with the speaking_for_urn
138 # is verified by xmlsec1
139 # asserts U.speaks_for(U)<-T ("user says that T may speak for user")
140 # If schema provided, validate against schema
141 # is trusted by given set of trusted roots (both user cert and tool cert)
142 # String user certificate of speaking_for user if the above tests succeed
144 # Error message indicating why the speaks_for call failed ("" otherwise)
147 def verify_speaks_for(cred, tool_gid, speaking_for_urn,
148 trusted_roots, schema=None, logger=None):
150 # Credential has not expired
151 if cred.expiration and cred.expiration < datetime.datetime.utcnow():
152 return False, None, "ABAC Credential expired at {} ({})"\
153 .format(cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred())
156 if cred.get_cred_type() != ABACCredential.ABAC_CREDENTIAL_TYPE:
157 return False, None, "Credential not of type ABAC but {}".format(cred.get_cred_type)
159 if cred.signature is None or cred.signature.gid is None:
160 return False, None, "Credential malformed: missing signature or signer cert. Cred: {}"\
161 .format(cred.pretty_cred())
162 user_gid = cred.signature.gid
163 user_urn = user_gid.get_urn()
165 # URN of signer from cert must match URN of 'speaking-for' argument
166 if user_urn != speaking_for_urn:
167 return False, None, "User URN from cred doesn't match speaking_for URN: {} != {} (cred {})"\
168 .format(user_urn, speaking_for_urn, cred.pretty_cred())
170 tails = cred.get_tails()
172 return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got {} ({})"\
173 .format(len(tails), cred.pretty_cred())
175 user_keyid = get_cert_keyid(user_gid)
176 tool_keyid = get_cert_keyid(tool_gid)
177 subject_keyid = tails[0].get_principal_keyid()
179 head = cred.get_head()
180 principal_keyid = head.get_principal_keyid()
181 role = head.get_role()
183 # Credential must pass xmlsec1 verify
184 cred_file = write_to_tempfile(cred.save_to_string())
187 for x in trusted_roots:
188 cert_args += ['--trusted-pem', x.filename]
189 # FIXME: Why do we not need to specify the --node-id option as
190 # credential.py does?
191 xmlsec1 = cred.get_xmlsec1_path()
193 raise Exception("Could not locate required 'xmlsec1' program")
194 xmlsec1_args = [xmlsec1, '--verify'] + cert_args + [cred_file]
195 output = run_subprocess(xmlsec1_args, stdout=None, stderr=subprocess.PIPE)
199 # xmlsec errors have a msg= which is the interesting bit.
200 # But does this go to stderr or stdout? Do we have it here?
201 mstart = verified.find("msg=")
203 if mstart > -1 and len(verified) > 4:
205 mend = verified.find('\\', mstart)
206 msg = verified[mstart:mend]
209 return False, None, "ABAC credential failed to xmlsec1 verify: {}".format(msg)
211 # Must say U.speaks_for(U)<-T
212 if user_keyid != principal_keyid or \
213 tool_keyid != subject_keyid or \
214 role != ('speaks_for_{}'.format(user_keyid)):
215 return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T ({})"\
216 .format(cred.pretty_cred())
218 # If schema provided, validate against schema
219 if HAVELXML and schema and os.path.exists(schema):
220 from lxml import etree
221 tree = etree.parse(StringIO(cred.xml))
222 schema_doc = etree.parse(schema)
223 xmlschema = etree.XMLSchema(schema_doc)
224 if not xmlschema.validate(tree):
225 error = xmlschema.error_log.last_error
226 message = "{}: {} (line {})".format(
227 cred.pretty_cred(), error.message, error.line)
228 return False, None, ("XML Credential schema invalid: {}".format(message))
231 # User certificate must validate against trusted roots
233 user_gid.verify_chain(trusted_roots)
234 except Exception as e:
235 return False, None, \
236 "Cred signer (user) cert not trusted: {}".format(e)
238 # Tool certificate must validate against trusted roots
240 tool_gid.verify_chain(trusted_roots)
241 except Exception as e:
242 return False, None, "Tool cert not trusted: {}".format(e)
244 return True, user_gid, ""
246 # Determine if this is a speaks-for context. If so, validate
247 # And return either the tool_cert (not speaks-for or not validated)
248 # or the user cert (validated speaks-for)
250 # credentials is a list of GENI-style credentials:
251 # Either a cred string xml string, or Credential object of a tuple
252 # [{'geni_type' : geni_type, 'geni_value : cred_value,
253 # 'geni_version' : version}]
254 # caller_gid is the raw X509 cert gid
255 # options is the dictionary of API-provided options
256 # trusted_roots is a list of Certificate objects from the system
257 # trusted_root directory
258 # Optionally, provide an XML schema against which to validate the credential
261 def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trusted_roots, schema=None):
263 speaking_for_urn = Xrn(speaking_for_xrn.strip()).get_urn()
264 for cred in credentials:
265 # Skip things that aren't ABAC credentials
266 if type(cred) == dict:
267 if cred['geni_type'] != ABACCredential.ABAC_CREDENTIAL_TYPE:
269 cred_value = cred['geni_value']
270 elif isinstance(cred, Credential):
271 if not isinstance(cred, ABACCredential):
276 if CredentialFactory.getType(cred) != ABACCredential.ABAC_CREDENTIAL_TYPE:
280 # If the cred_value is xml, create the object
281 if not isinstance(cred_value, ABACCredential):
282 cred = CredentialFactory.createCred(cred_value)
284 # print("Got a cred to check speaksfor for: {}".format(cred.pretty_cred()))
285 # #cred.dump(True, True)
286 # print("Caller: {}".format(caller_gid.dump_string(2, True)))
287 # See if this is a valid speaks_for
288 is_valid_speaks_for, user_gid, msg = \
289 verify_speaks_for(cred,
290 caller_gid, speaking_for_urn,
291 trusted_roots, schema, logger=logger)
293 if is_valid_speaks_for:
294 return user_gid # speaks-for
296 logger.info("Got speaks-for option but not a valid speaks_for with this credential: {}"
298 return caller_gid # Not speaks-for
300 # Create an ABAC Speaks For credential using the ABACCredential object and
301 # it's encode&sign methods
304 def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filename, dur_days=365):
305 logger.info("Creating ABAC SpeaksFor using ABACCredential...\n")
306 # Write out the user cert
307 from tempfile import mkstemp
308 ma_str = ma_gid.save_to_string()
309 user_cert_str = user_gid.save_to_string()
310 if not user_cert_str.endswith(ma_str):
311 user_cert_str += ma_str
312 fp, user_cert_filename = mkstemp(suffix='cred', text=True)
313 fp = os.fdopen(fp, "w")
314 fp.write(user_cert_str)
318 cred = ABACCredential()
319 cred.set_issuer_keys(user_key_file, user_cert_filename)
320 tool_urn = tool_gid.get_urn()
321 user_urn = user_gid.get_urn()
322 user_keyid = get_cert_keyid(user_gid)
323 tool_keyid = get_cert_keyid(tool_gid)
324 cred.head = ABACElement(user_keyid, user_urn,
325 "speaks_for_{}".format(user_keyid))
326 cred.tails.append(ABACElement(tool_keyid, tool_urn))
327 cred.set_expiration(datetime.datetime.utcnow() +
328 datetime.timedelta(days=dur_days))
329 cred.expiration = cred.expiration.replace(microsecond=0)
331 # Produce the cred XML
337 cred.save_to_file(cred_filename)
338 logger.info("Created ABAC credential: '{}' in file {}"
339 .format(cred.pretty_cred(), cred_filename))
341 # FIXME: Assumes signer is itself signed by an 'ma_gid' that can be trusted
344 def create_speaks_for(tool_gid, user_gid, ma_gid,
345 user_key_file, cred_filename, dur_days=365):
346 tool_urn = tool_gid.get_urn()
347 user_urn = user_gid.get_urn()
351 credential_format = """\
352 <?xml version="1.0" encoding="UTF-8"?>
353 <signed-credential xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://www.geni.net/resources/credential/2/credential.xsd" xsi:schemaLocation="http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd">
354 <credential xml:id="{refid}">
362 <expires>{expiration_str}</expires>
365 <version>{version}</version>
367 <ABACprincipal><keyid>{user_keyid}</keyid><mnemonic>{user_urn}</mnemonic></ABACprincipal>
368 <role>speaks_for_{user_keyid}</role>
371 <ABACprincipal><keyid>{tool_keyid}/keyid><mnemonic>{tool_urn}</mnemonic></ABACprincipal>
376 <signatures>""" + signature_format + """\
378 </signed-credential>\
381 credential_duration = datetime.timedelta(days=dur_days)
382 expiration = datetime.datetime.utcnow() + credential_duration
383 expiration_str = expiration.strftime(SFATIME_FORMAT)
386 user_keyid = get_cert_keyid(user_gid)
387 tool_keyid = get_cert_keyid(tool_gid)
388 # apply the format - itself uses signature_format which uses 'refid'
389 unsigned_cred = credential_format.format(**locals())
390 unsigned_cred_filename = write_to_tempfile(unsigned_cred)
392 # Now sign the file with xmlsec1
393 # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem
394 # --output signed.xml tosign.xml
395 pems = "{},{},{}".format(user_key_file, user_gid.get_filename(),
396 ma_gid.get_filename())
397 xmlsec1 = Credential.get_xmlsec1_path()
399 raise Exception("Could not locate required 'xmlsec1' program")
400 cmd = [xmlsec1, '--sign', '--privkey-pem', pems,
401 '--output', cred_filename, unsigned_cred_filename]
403 # print(" ".join(cmd))
404 sign_proc_output = run_subprocess(cmd, stdout=subprocess.PIPE, stderr=None)
405 if sign_proc_output == None:
406 logger.info("xmlsec1 returns empty output")
408 logger.info("Created ABAC credential: '{} speaks_for {}' in file {}"
409 .format(tool_urn, user_urn, cred_filename))
410 os.unlink(unsigned_cred_filename)
414 if __name__ == "__main__":
416 parser = optparse.OptionParser()
417 parser.add_option('--cred_file',
418 help='Name of credential file')
419 parser.add_option('--tool_cert_file',
420 help='Name of file containing tool certificate')
421 parser.add_option('--user_urn',
422 help='URN of speaks-for user')
423 parser.add_option('--user_cert_file',
424 help="filename of x509 certificate of signing user")
425 parser.add_option('--ma_cert_file',
426 help="filename of x509 cert of MA that signed user cert")
427 parser.add_option('--user_key_file',
428 help="filename of private key of signing user")
429 parser.add_option('--trusted_roots_directory',
430 help='Directory of trusted root certs')
431 parser.add_option('--create',
432 help="name of file of ABAC speaksfor cred to create")
433 parser.add_option('--useObject', action='store_true', default=False,
434 help='Use the ABACCredential object to create the credential (default False)')
436 options, args = parser.parse_args(sys.argv)
438 tool_gid = GID(filename=options.tool_cert_file)
441 if options.user_cert_file and options.user_key_file \
442 and options.ma_cert_file:
443 user_gid = GID(filename=options.user_cert_file)
444 ma_gid = GID(filename=options.ma_cert_file)
445 if options.useObject:
446 create_sign_abaccred(tool_gid, user_gid, ma_gid,
447 options.user_key_file,
450 create_speaks_for(tool_gid, user_gid, ma_gid,
451 options.user_key_file,
454 print("Usage: --create cred_file " +
455 "--user_cert_file user_cert_file" +
456 " --user_key_file user_key_file --ma_cert_file ma_cert_file")
459 user_urn = options.user_urn
461 # Get list of trusted rootcerts
462 if options.cred_file and not options.trusted_roots_directory:
463 sys.exit("Must supply --trusted_roots_directory to validate a credential")
465 trusted_roots_directory = options.trusted_roots_directory
467 [Certificate(filename=os.path.join(trusted_roots_directory, file))
468 for file in os.listdir(trusted_roots_directory)
469 if file.endswith('.pem') and file != 'CATedCACerts.pem']
471 cred = open(options.cred_file).read()
473 creds = [{'geni_type': ABACCredential.ABAC_CREDENTIAL_TYPE, 'geni_value': cred,
474 'geni_version': '1'}]
475 gid = determine_speaks_for(None, creds, tool_gid,
476 {'geni_speaking_for': user_urn},
479 print('SPEAKS_FOR = {}'.format(gid != tool_gid))
480 print("CERT URN = {}".format(gid.get_urn()))