gleam-spacetraders-sdk/src/utils/jwt.gleam
Lily Rose 64f3729d0c
Some checks are pending
test / test (push) Waiting to run
Refactoring and general tidying up
2025-06-17 19:04:29 +10:00

149 lines
3.6 KiB
Gleam

import birl.{type Time}
import gleam/bit_array
import gleam/dynamic/decode.{type Decoder}
import gleam/json
import gleam/option.{type Option}
import gleam/result
import gleam/string
pub type JwtDecodeError {
MissingHeader
MissingPayload
MissingSignature
InvalidHeader
InvalidPayload
InvalidSignature
InvalidExpiration
TokenExpired
TokenNotValidYet
InvalidNotBefore
InvalidAlgorithm
}
pub type JwtAlgorithm {
HS256
HS384
HS512
RS256
RS384
RS512
ES256
ES384
ES512
PS256
PS384
PS512
None
}
pub fn parse_jwt_algorithm(value: String) -> Result(JwtAlgorithm, Nil) {
case value {
"HS256" -> Ok(HS256)
"HS384" -> Ok(HS384)
"HS512" -> Ok(HS512)
"RS256" -> Ok(RS256)
"RS384" -> Ok(RS384)
"RS512" -> Ok(RS512)
"ES256" -> Ok(ES256)
"ES384" -> Ok(ES384)
"ES512" -> Ok(ES512)
"PS256" -> Ok(PS256)
"PS384" -> Ok(PS384)
"PS512" -> Ok(PS512)
"none" -> Ok(None)
_ -> Error(Nil)
}
}
pub fn jwt_algorithm_decoder() -> Decoder(JwtAlgorithm) {
use value <- decode.then(decode.string)
case parse_jwt_algorithm(value) {
Ok(jwt_algorithm) -> decode.success(jwt_algorithm)
Error(Nil) -> decode.failure(None, "JwtAlgorithm")
}
}
pub type JwtHeader {
JwtHeader(algorithm: JwtAlgorithm, token_type: String)
}
fn jwt_header_decoder() -> Decoder(JwtHeader) {
use algorithm <- decode.field("alg", jwt_algorithm_decoder())
use token_type <- decode.field("typ", decode.string)
decode.success(JwtHeader(algorithm:, token_type:))
}
pub type JwtPayload {
JwtPayload(
identifier: String,
version: String,
reset_date: Option(Time),
issued_at: Time,
subject: String,
)
}
fn reset_date_decoder() -> Decoder(Time) {
use value <- decode.then(decode.string)
case birl.from_naive(value) {
Ok(time) -> decode.success(time)
Error(Nil) -> decode.failure(birl.now(), "Time")
}
}
fn jwt_payload_decoder() -> Decoder(JwtPayload) {
use identifier <- decode.field("identifier", decode.string)
use version <- decode.field("version", decode.string)
use reset_date <- decode.optional_field(
"reset_date",
option.None,
decode.optional(reset_date_decoder()),
)
use issued_at_int <- decode.field("iat", decode.int)
let issued_at = birl.from_unix(issued_at_int)
use subject <- decode.field("sub", decode.string)
decode.success(JwtPayload(
identifier:,
version:,
reset_date:,
issued_at:,
subject:,
))
}
pub type Jwt {
Jwt(header: JwtHeader, payload: JwtPayload, signature: String)
}
pub fn parse(token: String) -> Result(Jwt, JwtDecodeError) {
case string.split(token, ".") {
[encoded_header, encoded_payload, signature, ..] -> {
use header <- result.try(
bit_array.base64_url_decode(encoded_header)
|> result.replace_error(InvalidHeader)
|> result.then(fn(x) {
bit_array.to_string(x) |> result.replace_error(InvalidHeader)
})
|> result.then(fn(x) {
json.parse(x, jwt_header_decoder())
|> result.replace_error(InvalidHeader)
}),
)
use payload <- result.try(
bit_array.base64_url_decode(encoded_payload)
|> result.replace_error(InvalidPayload)
|> result.then(fn(x) {
bit_array.to_string(x) |> result.replace_error(InvalidPayload)
})
|> result.then(fn(x) {
json.parse(x, jwt_payload_decoder())
|> result.replace_error(InvalidPayload)
}),
)
Ok(Jwt(header:, payload:, signature:))
}
[_, _] -> Error(MissingSignature)
[_] -> Error(MissingPayload)
[] -> Error(MissingHeader)
}
}