Skip to content

Verify KeyCloak Access Token

Preface

This guide will explain how to validate a Subject's KeyCloak access token using the KeyCloak JSON Web Key Set (JWKS).

It is recommended you delegate this operation to the Organisational Policy following the method described in Using Organisational Policy, however the implementation example given in Implementing Manually can be used if required.

Using Organisational Policy

When loaded, you can delegate KeyCloak token verification decisions to the Organisational Policy Bundle by referencing the data.diamond.policy.token.claims variable in your policy and setting the ISSUER environment variable to point to the KeyCloak instance - e.g. https://authn.diamond.ac.uk/realms/master.

The example below shows how you might write a system package which allows the action if input.action is "do_thing" and the input.token is for the subject "bob".

Example

system.rego
package system

import data.diamond.policy.token
import rego.v1

# METADATA
# description: Allow bob to do a thing
# entrypoint: true
main := {"allow": allow}

default allow := false

# Allow bob to do the thing
allow if {
    input.action == "do_thing"
    token.claims.sub == "bob"
}

Implementing Manually

User claims are derived from the token, with verification performed against the JSON Web Key Set, with the Key Set cycled periodically.

package diamond.policy.token

import rego.v1

issuer := opa.runtime().env.ISSUER

jwks_endpoint := jwks_endpoint if {
    metadata := http.send({
        "url": concat("", [issuer, "/.well-known/openid-configuration"]),
        "method": "GET",
        "force_cache": true,
        "force_cache_duration_seconds": 86400,
    }).body
    jwks_endpoint := metadata.jwks_uri
}

fetch_jwks(url) := http.send({
    "url": url,
    "method": "GET",
    "force_cache": true,
    "force_cache_duration_seconds": 86400,
})

unverified := io.jwt.decode(input.token)

jwt_header := unverified[0]

jwks_url := concat("?", [jwks_endpoint, urlquery.encode_object({"kid": jwt_header.kid})])

jwks := fetch_jwks(jwks_url).raw_body

verified := io.jwt.decode_verify(input.token, {
    "cert": jwks,
    "iss": issuer,
    "aud": input.audience,
})

claims := verified[2] if verified[0]