Migrating from Prefect1.x to Prefect2.x #369
annshress
started this conversation in
Show and tell
Replies: 1 comment
-
You can tell a flow to return the state like this:
From: https://docs.prefect.io/2.14.3/concepts/states/#return-prefect-state This should work the same if the return value is What complicates matters is that many Hedwig tasks are Mapped, so the return value is a List of States (or Futures). |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
(This is a work in progress. Please comment below if there are things relevant and necessary to be added to this related topic.)
Prefect Recipes
Change in workflows code
Flow registration occurs through a decorator
@flow
nowupstream_tasks
changes towait_for
❗️NOTE❗️ Upstream tasks failure does not trigger failure in downstream tasks. In Prefect 1.x, there is a state called
TriggerFailed
that is created byFailed
state from the upstream task. Read MoreIn case of Prefect 2.x, the
TriggerFailed
does not exist. The downstream tasks stays in aNotReady
state as the upstream tasks did notComplete
. In order allow TriggerFailed behavior, we should useallow_failure(previous_task_run)
annotation. [Annotations Docs] ... More information belowThere are state change hooks such as
on_completion
andon_failed
(Check the docs)We might need some tasks to always run no matter what happens to previous task (For example, cleaning up temporary directories). For that we can use
allow_failure
annotations. Check here.If you are using dask, the
DaskRunner
is now its own package. LinkPrefect captures all exceptions and re-raises it. In Prefect 1, we normally raised
signals.Fail
to make the process smoother. You can also still retain that behavior by usingreturn_state=True
.Subflows.
One big takeaway of Prefect2 is the ease of using subflows. With subflows, you can have a different task runners acting on different tasks under the same flow. Docs
Upstream tasks failed
This test in prefect is a good example of how prefect handles upstream failures. The downstream tasks are moved to a Pending state named
NotReady
which is techincally a terminal state, but not so.This impacts the execution of downstream tasks specially if you are using
wait_for
argument, such that you are waiting for an upstream task that has failed. This prevents the given task from being executed. In order to handle this, try to minimize the use ofwait_for
and instead inject the upstream dependencies as arguments to the task function call (as shown in the example)In the end, you could manually get the state result as below
Importance of having a good task dependencies
Aim to return a single important entity that is used in the downstream tasks. Assume we have following tasks:
Here we avoided the
wait_for
argument because we passed a prefect future (pngs) directly as an input to the next task.In order to understand subtle difference between the two pieces of code, try the following:
In the task
task_jpg_to_png
, raise an exception for only one of the inputs. You will see that in the first approach, png conversion won't run for any mapped tasks, while in the second approach, the png conversion runs for other non-failed tasks.Getting Upstream Task Dependency
Scenario: Assume one of our downstream tasks (lets call it DF for 'Downstream Failed') did not run, which means it is still in the Pending(NotReady) state. In order to know the culprit upstream task, we need to know which tasks did DF depend upon.
So we need to get
task_run_id
of DF and go up the stream. Since task run ids are generated by the prefect server rather than prefect workers, we need to make api calls to the server to get inputtask_run
ids that DF depends on (api isGET /api/task_runs/{id}
).This is a cumbersome approach with lot of api calls which can be slow, so we are looking into how we can get it while staying on the worker side.
Logging
You cannot access
prefect.context.logger
outside a flow or a task. We will get aMissingContextError
if we try to accessget_run_logger
outside a flow/task. The way to access the logger is explained here. [This needs elaboration]Testing
Testing flows Without
prefect_test_harness()
, the flow calls in tests will be registered by your real prefect server.Flows that return
None
are tricky with testing. In p1, we could simply doIn p2, the final state is a list of states as mentioned here. Although this is already logged in the final state result as follows,
tests still do get a list of
states
.In order to fix this, you can add
return_state=True
to your flow calls as shown here (credit: @mbopfNIH)...
Deploying Prefect Server
Prefect server acts as a storage/queue for the world to submit their jobs. The current deployment process used by Hedwig using Monarch Solutions and Terraform should be extendable to other projects as well.
Current concerns:
The server is available to anyone who is in the NIAID VPN. Anyone can access the website and register a random flow, or anyone can constantly send a
flow run
request to the prefect server, which is bound to be picked up by prefect workers.In order to tackle this, we followed a basic guide to add minimal authentication into prefect server Guide. The secret values, such as bearer token and username/password pairs, can be added into aws secrets. The secret values can be rotated as well, which will require some extra work for retrieval.
Workers are preferred over Agents
Prefect 1 only had agents to retrieve and execute jobs from prefect server. Prefect 2 recommends using workers and work pools over agents.
Possible Upgrades
For applications with jobs/tasks that are competing for resources, take benefits from dask-annotations
Beta Was this translation helpful? Give feedback.
All reactions