Skip to content

Commit

Permalink
block on thumbnailing process instead of having it be weirdly fire-an…
Browse files Browse the repository at this point in the history
…d-forget (#782)

Fixes: #584
  • Loading branch information
zkat authored Feb 17, 2024
1 parent 40e1586 commit bd9d89a
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 72 deletions.
4 changes: 3 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ end

config :banchan, Banchan.Mailer, adapter: Bamboo.TestAdapter

config :banchan, Oban, testing: :inline
config :banchan, Oban,
testing: :inline,
notifier: Oban.Notifiers.PG

# We don't run a server during test. If one is required,
# you can enable the server option below.
Expand Down
16 changes: 4 additions & 12 deletions lib/banchan/commissions/commissions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1111,12 +1111,7 @@ defmodule Banchan.Commissions do
Thumbnailer.thumbnail(
upload,
target_size: "5kb",
dimensions: "128x128",
callback: [
Notifications,
:commission_event_updated,
[event.commission_id, event.id]
]
dimensions: "128x128"
)
else
{:ok, nil}
Expand All @@ -1127,12 +1122,7 @@ defmodule Banchan.Commissions do
Thumbnailer.thumbnail(
upload,
dimensions: "1200",
name: "preview.jpg",
callback: [
Notifications,
:commission_event_updated,
[event.commission_id, event.id]
]
name: "preview.jpg"
)
else
{:ok, nil}
Expand All @@ -1148,6 +1138,8 @@ defmodule Banchan.Commissions do
end)
end)

Notifications.commission_event_updated(event.commission_id, event.id)

:ok
end

Expand Down
17 changes: 11 additions & 6 deletions lib/banchan/offerings/offerings.ex
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,15 @@ defmodule Banchan.Offerings do
end
end)
|> case do
{:ok, ret} -> ret
{:error, error} -> {:error, error}
{:ok, {:ok, offering}} ->
__MODULE__.Notifications.notify_images_updated(offering)
{:ok, offering}

{:ok, {:error, error}} ->
{:error, error}

{:error, error} ->
{:error, error}
end
end

Expand Down Expand Up @@ -282,8 +289,7 @@ defmodule Banchan.Offerings do
Thumbnailer.thumbnail(
upload,
dimensions: "1200",
name: "card_image.jpg",
callback: [__MODULE__.Notifications, :notify_images_updated]
name: "card_image.jpg"
)

card
Expand All @@ -298,8 +304,7 @@ defmodule Banchan.Offerings do
Thumbnailer.thumbnail(
upload,
dimensions: "1200",
name: "gallery_image.jpg",
callback: [__MODULE__.Notifications, :notify_images_updated]
name: "gallery_image.jpg"
)

image
Expand Down
114 changes: 61 additions & 53 deletions lib/banchan/workers/thumbnailer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ defmodule Banchan.Workers.Thumbnailer do

@impl Oban.Worker
def perform(%_{
id: job_id,
args: %{
"src" => src_id,
"dest" => dest_id,
"opts" => opts
}
}) do
process(src_id, dest_id, opts)
process(job_id, src_id, dest_id, opts)
end

def thumbnail(upload, opts \\ [])
Expand All @@ -35,43 +36,60 @@ defmodule Banchan.Workers.Thumbnailer do
end

def thumbnail(%Upload{} = upload, opts) do
if !Uploads.image?(upload) && !Uploads.video?(upload) do
{:error, :unsupported_input}
else
Ecto.Multi.new()
|> Ecto.Multi.insert(
:pending,
Uploads.gen_pending(
%User{id: upload.uploader_id},
upload,
"image/jpeg",
Keyword.get(opts, :name, "thumbnail.jpg")
Task.async(fn ->
if !Uploads.image?(upload) && !Uploads.video?(upload) do
{:error, :unsupported_input}
else
:ok = Oban.Notifier.listen([:thumbnail_jobs])

Ecto.Multi.new()
|> Ecto.Multi.insert(
:pending,
Uploads.gen_pending(
%User{id: upload.uploader_id},
upload,
"image/jpeg",
Keyword.get(opts, :name, "thumbnail.jpg")
)
)
)
|> Ecto.Multi.run(:job, fn _repo, %{pending: pending} ->
Oban.insert(
__MODULE__.new(%{
src: upload.id,
dest: pending.id,
opts: %{
target_size: Keyword.get(opts, :target_size),
format: Keyword.get(opts, :format, "jpeg"),
dimensions: Keyword.get(opts, :dimensions),
callback: Keyword.get(opts, :callback)
}
})
)
end)
|> Repo.transaction()
|> case do
{:ok, %{pending: pending}} -> {:ok, pending}
{:error, _, error, _} -> {:error, error}
|> Ecto.Multi.run(:job, fn _repo, %{pending: pending} ->
Oban.insert(
__MODULE__.new(%{
src: upload.id,
dest: pending.id,
opts: %{
target_size: Keyword.get(opts, :target_size),
format: Keyword.get(opts, :format, "jpeg"),
dimensions: Keyword.get(opts, :dimensions)
}
})
)
end)
|> Repo.transaction()
|> case do
{:ok, %{pending: pending, job: %{id: job_id}}} ->
receive do
{:notification, :thumbnail_jobs, %{"complete" => ^job_id, "result" => "ok"}} ->
{:ok, Uploads.get_by_id!(pending.id)}

{:notification, :thumbnail_jobs,
%{"complete" => ^job_id, "result" => {"error", err}}} ->
{:error, String.to_existing_atom(err)}
after
Keyword.get(opts, :timeout) || 300_000 ->
{:error, :timeout}
end

{:error, _, error, _} ->
{:error, error}
end
end
end
end)
|> Task.await(Keyword.get(opts, :timeout) || 300_000)
end

# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
defp process(src_id, dest_id, opts) do
defp process(job_id, src_id, dest_id, opts) do
Repo.transaction(fn ->
src = Uploads.get_by_id!(src_id)
dest = Uploads.get_by_id!(dest_id)
Expand Down Expand Up @@ -166,26 +184,7 @@ defmodule Banchan.Workers.Thumbnailer do
{:ok, dest}
end)
|> case do
{:ok, {:ok, dest}} ->
case opts["callback"] do
[module, name, args] ->
apply(
String.to_existing_atom(module),
String.to_existing_atom(name),
args
)

[module, name] ->
apply(
String.to_existing_atom(module),
String.to_existing_atom(name),
[dest]
)

_ ->
nil
end

{:ok, {:ok, _}} ->
{:ok, dest_id}

{:ok, {:error, error}} ->
Expand All @@ -194,5 +193,14 @@ defmodule Banchan.Workers.Thumbnailer do
{:error, _} ->
{:error, :processing_failed}
end
|> case do
{:ok, dest_id} ->
Oban.Notifier.notify(:thumbnail_jobs, %{complete: job_id, result: :ok})
{:ok, dest_id}

{:error, err} ->
Oban.Notifier.notify(:thumbnail_jobs, %{complete: job_id, result: {:error, err}})
{:error, err}
end
end
end

0 comments on commit bd9d89a

Please sign in to comment.