Skip to content

Commit

Permalink
fix: Improve Authz performance by reducing queries required (#1239)
Browse files Browse the repository at this point in the history
* Removes set_config for jwt token which is not used
* Reduces read check to single query instead of 2 queries
* Streamlines overall code to be leaner
  • Loading branch information
filipecabaco authored Dec 2, 2024
1 parent 93fb070 commit 4eaa930
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 579 deletions.
83 changes: 48 additions & 35 deletions lib/realtime/tenants/authorization.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ defmodule Realtime.Tenants.Authorization do
alias Realtime.Api.Message
alias Realtime.Repo
alias Realtime.Tenants.Authorization.Policies
alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies
alias Realtime.Tenants.Authorization.Policies.PresencePolicies

defstruct [:topic, :headers, :jwt, :claims, :role]

Expand Down Expand Up @@ -85,7 +83,6 @@ defmodule Realtime.Tenants.Authorization do
* role: The role of the user
* realtime.topic: The name of the channel being accessed
* request.jwt.claim.role: The role of the user
* request.jwt: The JWT token
* request.jwt.claim.sub: The sub claim of the JWT token
* request.jwt.claims: The claims of the JWT token
* request.headers: The headers of the request
Expand All @@ -96,7 +93,6 @@ defmodule Realtime.Tenants.Authorization do
%__MODULE__{
topic: topic,
headers: headers,
jwt: jwt,
claims: claims,
role: role
} = authorization_context
Expand All @@ -110,15 +106,13 @@ defmodule Realtime.Tenants.Authorization do
SELECT
set_config('role', $1, true),
set_config('realtime.topic', $2, true),
set_config('request.jwt', $3, true),
set_config('request.jwt.claims', $4, true),
set_config('request.headers', $5, true)
set_config('request.jwt.claims', $3, true),
set_config('request.headers', $4, true)
""",
[role, topic, jwt, claims, headers]
[role, topic, claims, headers]
)
end

@policies_mods [BroadcastPolicies, PresencePolicies]
defp get_policies_for_connection(conn, authorization_context) do
Database.transaction(conn, fn transaction_conn ->
messages = [
Expand All @@ -131,24 +125,23 @@ defmodule Realtime.Tenants.Authorization do
{[%{id: broadcast_id}], [%{id: presence_id}]} =
Enum.split_with(messages, &(&1.extension == :broadcast))

ids = %{presence_id: presence_id, broadcast_id: broadcast_id}

set_conn_config(transaction_conn, authorization_context)
policies = %Policies{}

policies =
get_read_policy_for_connection_and_extension(
transaction_conn,
ids,
policies,
authorization_context
authorization_context,
broadcast_id,
presence_id,
policies
)

policies =
get_write_policy_for_connection_and_extension(
transaction_conn,
policies,
authorization_context
authorization_context,
policies
)

Postgrex.query!(transaction_conn, "ROLLBACK AND CHAIN", [])
Expand All @@ -157,27 +150,47 @@ defmodule Realtime.Tenants.Authorization do
end)
end

defp get_read_policy_for_connection_and_extension(conn, ids, policies, authorization_context) do
Enum.reduce_while(@policies_mods, policies, fn policy_mod, policies ->
res = policy_mod.check_read_policies(conn, ids, policies, authorization_context)

case res do
{:error, error} -> {:halt, {:error, error}}
%DBConnection.TransactionError{} = err -> {:halt, err}
{:ok, policy} -> {:cont, policy}
end
end)
import Ecto.Query

defp get_read_policy_for_connection_and_extension(
conn,
authorization_context,
broadcast_id,
presence_id,
policies
) do
query =
from(m in Message,
where: [topic: ^authorization_context.topic],
where: [extension: :broadcast, id: ^broadcast_id],
or_where: [extension: :presence, id: ^presence_id]
)

{:ok, res} = Repo.all(conn, query, Message)
can_presence? = Enum.any?(res, fn %{id: id} -> id == presence_id end)
can_broadcast? = Enum.any?(res, fn %{id: id} -> id == broadcast_id end)

policies
|> Policies.update_policies(:presence, :read, can_presence?)
|> Policies.update_policies(:broadcast, :read, can_broadcast?)
end

defp get_write_policy_for_connection_and_extension(conn, policies, authorization_context) do
Enum.reduce_while(@policies_mods, policies, fn policy_mod, policies ->
res = policy_mod.check_write_policies(conn, policies, authorization_context)
defp get_write_policy_for_connection_and_extension(
conn,
authorization_context,
policies
) do
broadcast_changeset =
Message.changeset(%Message{}, %{topic: authorization_context.topic, extension: :broadcast})

case res do
{:error, error} -> {:halt, {:error, error}}
%DBConnection.TransactionError{} = err -> {:halt, err}
{:ok, policy} -> {:cont, policy}
end
end)
presence_changeset =
Message.changeset(%Message{}, %{topic: authorization_context.topic, extension: :presence})

broadcast_result = Repo.insert(conn, broadcast_changeset, Message, mode: :savepoint)
presence_result = Repo.insert(conn, presence_changeset, Message, mode: :savepoint)

policies
|> Policies.update_policies(:presence, :write, match?({:ok, _}, presence_result))
|> Policies.update_policies(:broadcast, :write, match?({:ok, _}, broadcast_result))
end
end
28 changes: 2 additions & 26 deletions lib/realtime/tenants/authorization/policies.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ defmodule Realtime.Tenants.Authorization.Policies do
@moduledoc """
Policies structure that holds the required authorization information for a given connection.
Also defines a behaviour to be used by the different authorization modules to build and check policies within the context of an entity.
Currently there are two types of policies:
* Realtime.Tenants.Authorization.Policies.BroadcastPolicies - Used to check access to the Broadcast feature on a given Topic
* Realtime.Tenants.Authorization.Policies.PresencePolicies - Used to check access to Presence feature on a given Topic
* Realtime.Tenants.Authorization.Policies.BroadcastPolicies - Used to store the access to Broadcast feature on a given Topic
* Realtime.Tenants.Authorization.Policies.PresencePolicies - Used to store the access to Presence feature on a given Topic
"""

alias Realtime.Tenants.Authorization
alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies
alias Realtime.Tenants.Authorization.Policies.PresencePolicies

Expand All @@ -21,27 +18,6 @@ defmodule Realtime.Tenants.Authorization.Policies do
presence: PresencePolicies.t()
}

@doc """
Implementation of the method on how to check read policies for a given entity within the context of a database connection
Arguments:
* `db_conn` - The database connection with the required context to properly run checks
* `policies` - The policies struct to which the result will be accumulated
* `authorization` - The authorization struct with required information for Policy checking
"""
@callback check_read_policies(DBConnection.t(), map(), t(), Authorization.t()) ::
{:ok, t()} | {:error, any()}
@doc """
Implementation of the method on how to check write policies for a given entity within the context of a database connection
Arguments:
* `db_conn` - The database connection with the required context to properly run checks
* `policies` - The policies struct to which the result will be accumulated
* `authorization` - The authorization struct with required information for policy checking
"""
@callback check_write_policies(DBConnection.t(), t(), Authorization.t()) ::
{:ok, t()} | {:error, any()}

@doc """
Updates the Policies struct sub key with the given value.
"""
Expand Down
70 changes: 0 additions & 70 deletions lib/realtime/tenants/authorization/policies/broadcast_policies.ex
Original file line number Diff line number Diff line change
@@ -1,83 +1,13 @@
defmodule Realtime.Tenants.Authorization.Policies.BroadcastPolicies do
@moduledoc """
BroadcastPolicies structure that holds the required authorization information for a given connection within the scope of a sending / receiving broadcasts messages
Uses the Realtime.Api.Broadcast to try reads and writes on the database to determine authorization for a given connection.
Implements Realtime.Tenants.Authorization behaviour
"""
require Logger
import Ecto.Query
import Realtime.Logs

alias Realtime.Api.Message
alias Realtime.Repo
alias Realtime.Tenants.Authorization
alias Realtime.Tenants.Authorization.Policies
defstruct read: false, write: false

@behaviour Realtime.Tenants.Authorization.Policies

@type t :: %__MODULE__{
read: boolean(),
write: boolean()
}
@impl true
def check_read_policies(_conn, _, policies, %Authorization{topic: nil}) do
{:ok, Policies.update_policies(policies, :broadcast, :read, false)}
end

def check_read_policies(conn, %{broadcast_id: id}, %Policies{} = policies, %Authorization{
topic: topic
}) do
query =
from(m in Message,
where: m.topic == ^topic,
where: m.extension == :broadcast,
where: m.id == ^id
)

case Repo.all(conn, query, Message, mode: :savepoint) do
{:ok, []} ->
{:ok, Policies.update_policies(policies, :broadcast, :read, false)}

{:ok, [%Message{}]} ->
{:ok, Policies.update_policies(policies, :broadcast, :read, true)}

{:error, %Postgrex.Error{postgres: %{code: :insufficient_privilege}}} ->
{:ok, Policies.update_policies(policies, :broadcast, :read, false)}

{:error, error} ->
log_error(
"UnableToSetPolicies",
"Error getting policies for connection: #{to_log(error)}"
)

Postgrex.rollback(conn, error)
end
end

@impl true
def check_write_policies(_conn, policies, %Authorization{topic: nil}) do
{:ok, Policies.update_policies(policies, :broadcast, :write, false)}
end

def check_write_policies(conn, policies, %Authorization{topic: topic}) do
changeset =
Message.changeset(%Message{}, %{topic: topic, extension: :broadcast})

case Repo.insert(conn, changeset, Message, mode: :savepoint) do
{:ok, %Message{}} ->
{:ok, Policies.update_policies(policies, :broadcast, :write, true)}

{:error, %Postgrex.Error{postgres: %{code: :insufficient_privilege}}} ->
{:ok, Policies.update_policies(policies, :broadcast, :write, false)}

{:error, error} ->
log_error(
"UnableToSetPolicies",
"Error getting policies for connection: #{to_log(error)}"
)
end
end
end
72 changes: 0 additions & 72 deletions lib/realtime/tenants/authorization/policies/presence_policies.ex
Original file line number Diff line number Diff line change
@@ -1,85 +1,13 @@
defmodule Realtime.Tenants.Authorization.Policies.PresencePolicies do
@moduledoc """
PresencePolicies structure that holds the required authorization information for a given connection within the scope of a tracking / receiving presence messages
Uses the Realtime.Api.Presence to try reads and writes on the database to determine authorization for a given connection.
Implements Realtime.Tenants.Authorization behaviour
"""
require Logger
import Ecto.Query
import Realtime.Logs

alias Realtime.Api.Message
alias Realtime.Repo
alias Realtime.Tenants.Authorization
alias Realtime.Tenants.Authorization.Policies

defstruct read: false, write: false

@behaviour Realtime.Tenants.Authorization.Policies

@type t :: %__MODULE__{
read: boolean(),
write: boolean()
}
@impl true
def check_read_policies(_conn, _, policies, %Authorization{topic: nil}) do
{:ok, Policies.update_policies(policies, :presence, :read, false)}
end

def check_read_policies(conn, %{presence_id: id}, %Policies{} = policies, %Authorization{
topic: topic
}) do
query =
from(m in Message,
where: m.topic == ^topic,
where: m.extension == :presence,
where: m.id == ^id
)

case Repo.all(conn, query, Message, mode: :savepoint) do
{:ok, []} ->
{:ok, Policies.update_policies(policies, :presence, :read, false)}

{:ok, [%Message{}]} ->
{:ok, Policies.update_policies(policies, :presence, :read, true)}

{:error, %Postgrex.Error{postgres: %{code: :insufficient_privilege}}} ->
{:ok, Policies.update_policies(policies, :presence, :read, false)}

{:error, error} ->
log_error(
"UnableToSetPolicies",
"Error getting policies for connection: #{to_log(error)}"
)

Postgrex.rollback(conn, error)
end
end

@impl true
def check_write_policies(_conn, policies, %Authorization{topic: nil}) do
{:ok, Policies.update_policies(policies, :presence, :write, false)}
end

def check_write_policies(conn, policies, %Authorization{topic: topic}) do
changeset = Message.changeset(%Message{}, %{topic: topic, extension: :presence})

case Repo.insert(conn, changeset, Message, mode: :savepoint) do
{:ok, %Message{}} ->
{:ok, Policies.update_policies(policies, :presence, :write, true)}

{:error, %Postgrex.Error{postgres: %{code: :insufficient_privilege}}} ->
{:ok, Policies.update_policies(policies, :presence, :write, false)}

{:error, error} ->
log_error(
"UnableToSetPolicies",
"Error getting policies for connection: #{to_log(error)}"
)

Postgrex.rollback(conn, error)
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.61",
version: "2.33.62",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Loading

0 comments on commit 4eaa930

Please sign in to comment.