Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breakdown of jobs by state #63

Merged
merged 14 commits into from
Nov 11, 2024
4 changes: 3 additions & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
# Need to increase it because of <.label_value_list /> that does a <pre> tag, which will look broken with extra whitespace.
heex_line_length: 300
]
222 changes: 166 additions & 56 deletions lib/oban/live_dashboard.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,41 @@ defmodule Oban.LiveDashboard do
import Phoenix.LiveDashboard.Helpers, only: [format_value: 2]
import Ecto.Query

@impl true
def menu_link(_, _) do
{:ok, "Oban"}
end
@per_page_limits [20, 50, 100]

@oban_sorted_job_states [
"executing",
"available",
"scheduled",
"retryable",
"cancelled",
"discarded",
"completed"
]

@impl true
def render(assigns) do
~H"""
<.live_table
id="oban_jobs"
dom_id="oban-jobs"
page={@page}
row_attrs={&row_attrs/1}
row_fetcher={&fetch_jobs/2}
title="Oban Jobs"
search={false}
>
<:col field={:id} header="ID" sortable={:desc} />
<:col field={:state} sortable={:desc} />
<:col field={:queue} sortable={:desc} />
<:col field={:worker} sortable={:desc} />
<:col :let={job} field={:attempt} header="Attempts" sortable={:desc}>
<%= job.attempt %>/<%= job.max_attempts %>
</:col>
<:col :let={job} field={:inserted_at} sortable={:desc}>
<%= format_value(job.inserted_at) %>
</:col>
<:col :let={job} field={:scheduled_at} sortable={:desc}>
<%= format_value(job.scheduled_at) %>
</:col>
</.live_table>
<.live_modal
:if={@job != nil}
id="modal"
title="Job"
return_to={live_dashboard_path(@socket, @page, params: %{})}
>
<h5 class="mb-3">Oban</h5>
<.live_nav_bar id="oban_states" page={@page} nav_param="job_state" style={:bar} extra_params={["nav"]}>
<:item :for={{job_state, count} <- @job_state_counts} name={job_state} label={job_state_label(job_state, count)} method="navigate">
<.live_table id="oban_jobs" limit={per_page_limits()} dom_id={"oban-jobs-#{job_state}"} page={@page} row_attrs={&row_attrs/1} row_fetcher={&fetch_jobs(&1, &2, job_state)} default_sort_by={@timestamp_field} title="" search={false}>
<:col :let={job} field={:worker} sortable={:desc}>
<p class="font-weight-bold m-0"><%= job.worker %></p>
<pre class="font-weight-lighter text-muted m-0"><%= truncate(inspect(job.args)) %></pre>
</:col>
<:col :let={job} field={:attempt} header="Attempt" sortable={:desc}>
<%= job.attempt %>/<%= job.max_attempts %>
</:col>
<:col field={:queue} header="Queue" sortable={:desc} />
<:col :let={job} field={@timestamp_field} sortable={:desc}>
<%= format_value(timestamp(job, @timestamp_field)) %>
</:col>
</.live_table>
</:item>
</.live_nav_bar>

<.live_modal :if={@job != nil} id="job-modal" title={"Job - #{@job.id}"} return_to={live_dashboard_path(@socket, @page, params: %{})}>
<.label_value_list>
<:elem label="ID"><%= @job.id %></:elem>
<:elem label="State"><%= @job.state %></:elem>
Expand All @@ -53,24 +51,39 @@ defmodule Oban.LiveDashboard do
<:elem label="Attempts"><%= @job.attempt %>/<%= @job.max_attempts %></:elem>
<:elem label="Priority"><%= @job.priority %></:elem>
<:elem label="Attempted at"><%= format_value(@job.attempted_at) %></:elem>
<:elem :if={@job.cancelled_at} label="Cancelled at">
<%= format_value(@job.cancelled_at) %>
</:elem>
<:elem :if={@job.completed_at} label="Completed at">
<%= format_value(@job.completed_at) %>
</:elem>
<:elem :if={@job.discarded_at} label="Discarded at">
<%= format_value(@job.discarded_at) %>
</:elem>
<:elem :if={@job.cancelled_at} label="Cancelled at"><%= format_value(@job.cancelled_at) %></:elem>
<:elem :if={@job.completed_at} label="Completed at"><%= format_value(@job.completed_at) %></:elem>
<:elem :if={@job.discarded_at} label="Discarded at"><%= format_value(@job.discarded_at) %></:elem>
<:elem label="Inserted at"><%= format_value(@job.inserted_at) %></:elem>
<:elem label="Scheduled at"><%= format_value(@job.scheduled_at) %></:elem>
</.label_value_list>
</.live_modal>
"""
end

@impl true
def mount(params, _, socket) do
socket =
socket
|> assign(job_state: Map.get(params, "job_state", "executing"))
|> assign(sort_by: Map.get(params, "job_state"))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be moved to handle_params otherwise sort_by and job_state won't get updated except for when the view is re-mounted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. handle_params deserves a refactor later IMO.


{:ok, socket}
end

@impl true
def menu_link(_, _) do
{:ok, "Oban"}
end

@impl true
def handle_params(%{"params" => %{"job" => job_id}}, _url, socket) do
socket =
socket
|> assign(job: nil)
|> assign_job_state_counts()
|> assign_timestamp_field()

case fetch_job(job_id) do
{:ok, job} ->
{:noreply, assign(socket, job: job)}
Expand All @@ -81,8 +94,14 @@ defmodule Oban.LiveDashboard do
end
end

def handle_params(_params, _url, socket) do
{:noreply, assign(socket, job: nil)}
def handle_params(_params, _uri, socket) do
socket =
socket
|> assign(job: nil)
|> assign_job_state_counts()
|> assign_timestamp_field()

{:noreply, socket}
end

@impl true
Expand All @@ -91,28 +110,89 @@ defmodule Oban.LiveDashboard do
{:noreply, push_patch(socket, to: to)}
end

defp fetch_jobs(params, _node) do
total_jobs = Oban.Repo.aggregate(Oban.config(), Oban.Job, :count)
jobs = Oban.Repo.all(Oban.config(), jobs_query(params)) |> Enum.map(&Map.from_struct/1)
@impl true
def handle_refresh(socket) do
socket =
socket
|> assign_job_state_counts()

{:noreply, socket}
egze marked this conversation as resolved.
Show resolved Hide resolved
end

defp assign_job_state_counts(socket) do
job_state_counts_in_db =
Oban.Repo.all(
Oban.config(),
Oban.Job
|> group_by([j], [j.state])
|> order_by([j], [j.state])
|> select([j], {j.state, count(j.id)})
)
|> Enum.into(%{})

job_state_counts =
for job_state <- @oban_sorted_job_states,
do: {job_state, Map.get(job_state_counts_in_db, job_state, 0)}

total_count = Keyword.values(job_state_counts) |> Enum.sum()
job_state_counts = [{"all", total_count} | job_state_counts]

assign(socket, job_state_counts: job_state_counts)
end

defp job_state_label(job_state, count) do
"#{job_state} - (#{count})"
egze marked this conversation as resolved.
Show resolved Hide resolved
end

defp fetch_jobs(params, _node, job_state) do
total_jobs = Oban.Repo.aggregate(Oban.config(), jobs_count_query(job_state), :count)

jobs =
Oban.Repo.all(Oban.config(), jobs_query(params, job_state)) |> Enum.map(&Map.from_struct/1)

{jobs, total_jobs}
end

defp fetch_job(id) do
case Oban.Repo.get(Oban.config(), Oban.Job, id) do
nil ->
:error

job ->
%Oban.Job{} = job ->
{:ok, job}

_ ->
:error
end
end

defp jobs_query(%{sort_by: sort_by, sort_dir: sort_dir, limit: l}) do
defp jobs_query(%{sort_by: sort_by, sort_dir: sort_dir, limit: limit}, "all" = _job_state) do
egze marked this conversation as resolved.
Show resolved Hide resolved
Oban.Job
|> limit(^limit)
|> order_by({^sort_dir, ^sort_by})
end

defp jobs_query(params, job_state) do
Oban.Job
|> limit(^l)
|> filter_by_job_state(job_state)
|> filter_by_params(params)
end

defp jobs_count_query("all") do
Oban.Job
end

defp jobs_count_query(job_state) do
filter_by_job_state(Oban.Job, job_state)
end

defp filter_by_params(queryable, %{sort_by: sort_by, sort_dir: sort_dir, limit: limit}) do
queryable
|> limit(^limit)
|> order_by({^sort_dir, ^sort_by})
end

defp filter_by_job_state(queryable, job_state) do
where(queryable, [job], job.state == ^job_state)
end

defp row_attrs(job) do
[
{"phx-click", "show_job"},
Expand All @@ -125,9 +205,39 @@ defmodule Oban.LiveDashboard do
Enum.map(errors, &Map.get(&1, "error"))
end

def format_value(%DateTime{} = datetime) do
DateTime.to_string(datetime)
defp format_value(%DateTime{} = datetime) do
Calendar.strftime(datetime, "%Y-%m-%d %H:%M:%S")
end

defp format_value(value), do: value

defp timestamp(job, timestamp_field) do
Map.get(job, timestamp_field)
end

defp assign_timestamp_field(%{assigns: %{job_state: job_state}} = socket) do
timestamp_field =
case job_state do
"available" -> :scheduled_at
"cancelled" -> :cancelled_at
"completed" -> :completed_at
"discarded" -> :discarded_at
"executing" -> :attempted_at
"retryable" -> :scheduled_at
"scheduled" -> :scheduled_at
_ -> :inserted_at
end

assign(socket, timestamp_field: timestamp_field)
end

defp truncate(string, max_length \\ 50) do
if String.length(string) > max_length do
String.slice(string, 0, max_length) <> "…"
else
string
end
end

def format_value(nil), do: nil
defp per_page_limits, do: @per_page_limits
end
62 changes: 57 additions & 5 deletions test/oban/live_dashboard_test.exs
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add tests for filtering between states.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done a68eec2

I noticed that I had to do a Oban.LiveDashboardTest.Repo.delete_all(Oban.Job) in the test, because somehow the values from other tests were still there. Maybe something that you can look into later. Ideally the queries should be rolled back after each test.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up. I'll look into proper tear downs.

Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,75 @@ defmodule Oban.LiveDashboardTest do
test "shows jobs with limit" do
for _ <- 1..110, do: job_fixture()
{:ok, live, rendered} = live(build_conn(), "/dashboard/oban")
assert rendered |> :binary.matches("<td class=\"oban-jobs-id\"") |> length() <= 100

assert rendered |> :binary.matches("<td class=\"oban-jobs-all-worker\"") |> length() ==
20

rendered = render_patch(live, "/dashboard/oban?limit=100")
assert rendered |> :binary.matches("<td class=\"oban-jobs-id\"") |> length() == 100

assert rendered |> :binary.matches("<td class=\"oban-jobs-all-worker\"") |> length() ==
100
end

test "switch between states" do
Oban.LiveDashboardTest.Repo.delete_all(Oban.Job)
_executing_job = job_fixture(%{"foo" => "executing"}, "executing")
_completed_job = job_fixture(%{"foo" => "completed"}, "completed")

conn = build_conn()
{:ok, live, rendered} = live(conn, "/dashboard/oban")

assert rendered |> :binary.matches("<td class=\"oban-jobs-all-worker\"") |> length() == 2

{:ok, live, rendered} =
live
|> element("a", "executing - (1)")
|> render_click()
|> follow_redirect(conn)

assert rendered
|> :binary.matches("<td class=\"oban-jobs-executing-worker\"")
|> length() == 1

{:ok, live, rendered} =
live
|> element("a", "completed - (1)")
|> render_click()
|> follow_redirect(conn)

assert rendered
|> :binary.matches("<td class=\"oban-jobs-completed-worker\"")
|> length() == 1

{:ok, _live, rendered} =
live
|> element("a", "scheduled - (0)")
|> render_click()
|> follow_redirect(conn)

assert rendered
|> :binary.matches("<td class=\"oban-jobs-scheduled-worker\"")
|> length() == 0
end

test "shows job info modal" do
job = job_fixture(%{something: "foobar"})
{:ok, live, rendered} = live(build_conn(), "/dashboard/oban?params[job]=#{job.id}")
{:ok, live, _rendered} = live(build_conn(), "/dashboard/oban?params[job]=#{job.id}")
rendered = render(live)
assert rendered =~ "modal-content"
assert rendered =~ "%{&quot;something&quot; =&gt; &quot;foobar&quot;}"
refute live |> element("#modal-close") |> render_click() =~ "modal"
end

defp job_fixture(args \\ %{}) do
{:ok, job} = Oban.Job.new(args, worker: "FakeWorker") |> Oban.insert()
defp job_fixture(args \\ %{}, state \\ "executing") do
{:ok, job} =
Oban.Job.new(args,
worker: "FakeWorker",
state: state,
attempted_at: DateTime.utc_now()
)
|> Oban.insert()
egze marked this conversation as resolved.
Show resolved Hide resolved

job
end
end