From 4ceb667166b00440dd851971aa22d2818913e3b9 Mon Sep 17 00:00:00 2001 From: TranquilVarun <108307732+TranquilVarun@users.noreply.github.com> Date: Fri, 23 Aug 2024 17:29:36 -0700 Subject: [PATCH 01/13] Update worker.py --- examples/simple/worker.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/simple/worker.py b/examples/simple/worker.py index 1eca4c79..88c14ccf 100644 --- a/examples/simple/worker.py +++ b/examples/simple/worker.py @@ -1,3 +1,4 @@ +#Python import time from dotenv import load_dotenv @@ -22,11 +23,12 @@ def step1(self, context: Context): def main(): + #START registering_workflows_starting_workers workflow = MyWorkflow() worker = hatchet.worker("test-worker", max_runs=1) worker.register_workflow(workflow) worker.start() - + #END registering_workflows_starting_workers if __name__ == "__main__": main() From 45837dffbbc2dc22c4f41e0d3dbb02ba756722f2 Mon Sep 17 00:00:00 2001 From: TranquilVarun <108307732+TranquilVarun@users.noreply.github.com> Date: Fri, 23 Aug 2024 18:15:42 -0700 Subject: [PATCH 02/13] Update worker.py --- examples/concurrency_limit/worker.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/concurrency_limit/worker.py b/examples/concurrency_limit/worker.py index b4820628..15ab3df7 100644 --- a/examples/concurrency_limit/worker.py +++ b/examples/concurrency_limit/worker.py @@ -1,3 +1,4 @@ +#Python import time from dotenv import load_dotenv @@ -27,7 +28,9 @@ def step1(self, context: Context): def main(): workflow = ConcurrencyDemoWorkflow() + #START setting-concurrency-on-workers worker = hatchet.worker("concurrency-demo-worker", max_runs=10) + #END setting-concurrency-on-workers worker.register_workflow(workflow) worker.start() From d66c0839fb179476610ffc36051acb6688471093 Mon Sep 17 00:00:00 2001 From: TranquilVarun <108307732+TranquilVarun@users.noreply.github.com> Date: Fri, 23 Aug 2024 18:37:49 -0700 Subject: [PATCH 03/13] Update worker.py --- examples/concurrency_limit/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/concurrency_limit/worker.py b/examples/concurrency_limit/worker.py index 15ab3df7..16b0b5ef 100644 --- a/examples/concurrency_limit/worker.py +++ b/examples/concurrency_limit/worker.py @@ -10,7 +10,7 @@ hatchet = Hatchet(debug=True) - +#START concurrency_cancel_in_progress @hatchet.workflow(on_events=["concurrency-test"]) class ConcurrencyDemoWorkflow: @@ -24,7 +24,7 @@ def step1(self, context: Context): time.sleep(3) print("executed step1") return {"run": input["run"]} - +#END concurrency_cancel_in_progress def main(): workflow = ConcurrencyDemoWorkflow() From 4160bfb4f39b80642ff3f1a0e2a97e9cc654e74f Mon Sep 17 00:00:00 2001 From: TranquilVarun <108307732+TranquilVarun@users.noreply.github.com> Date: Fri, 23 Aug 2024 18:48:01 -0700 Subject: [PATCH 04/13] Update worker.py --- examples/concurrency_limit_rr/worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/concurrency_limit_rr/worker.py b/examples/concurrency_limit_rr/worker.py index be58ed81..66799092 100644 --- a/examples/concurrency_limit_rr/worker.py +++ b/examples/concurrency_limit_rr/worker.py @@ -1,3 +1,4 @@ +#Python import time from dotenv import load_dotenv @@ -8,7 +9,7 @@ hatchet = Hatchet(debug=True) - +#START concurrency_group_red_robin @hatchet.workflow(on_events=["concurrency-test"], schedule_timeout="10m") class ConcurrencyDemoWorkflowRR: @hatchet.concurrency( @@ -25,7 +26,7 @@ def step1(self, context): time.sleep(2) print("finished step1") pass - +#END concurrency_group_red_robin workflow = ConcurrencyDemoWorkflowRR() worker = hatchet.worker("concurrency-demo-worker-rr", max_runs=10) From ed3edbcba51bbe1880b0acfacee6dfa558b644e8 Mon Sep 17 00:00:00 2001 From: TranquilVarun <108307732+TranquilVarun@users.noreply.github.com> Date: Fri, 23 Aug 2024 18:53:24 -0700 Subject: [PATCH 05/13] Update worker.py --- examples/simple/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/simple/worker.py b/examples/simple/worker.py index 88c14ccf..a7c6718f 100644 --- a/examples/simple/worker.py +++ b/examples/simple/worker.py @@ -9,7 +9,7 @@ hatchet = Hatchet(debug=True) - +#START how-to-use-step-level-retries @hatchet.workflow(on_events=["user:create"]) class MyWorkflow: @hatchet.step(timeout="11s", retries=3) @@ -20,7 +20,7 @@ def step1(self, context: Context): return { "step1": "step1", } - +#END how-to-use-step-level-retries def main(): #START registering_workflows_starting_workers From 3acb4be4f404e73e9ca02402de0b58b90b86ecde Mon Sep 17 00:00:00 2001 From: TranquilVarun <108307732+TranquilVarun@users.noreply.github.com> Date: Fri, 23 Aug 2024 19:22:34 -0700 Subject: [PATCH 06/13] Update worker.py --- examples/timeout/worker.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/examples/timeout/worker.py b/examples/timeout/worker.py index 07885ba0..d1a81beb 100644 --- a/examples/timeout/worker.py +++ b/examples/timeout/worker.py @@ -1,3 +1,4 @@ +#Python import time from dotenv import load_dotenv @@ -8,16 +9,17 @@ hatchet = Hatchet(debug=True) - +#START scheduling-timeouts @hatchet.workflow(on_events=["timeout:create"]) class TimeoutWorkflow: - +#END scheduling-timeouts +#START step-timeouts @hatchet.step(timeout="4s") def step1(self, context: Context): time.sleep(5) return {"status": "success"} - - +#END step-timeouts +#START refreshing-timeouts @hatchet.workflow(on_events=["refresh:create"]) class RefreshTimeoutWorkflow: @@ -28,7 +30,7 @@ def step1(self, context: Context): time.sleep(5) return {"status": "success"} - +#END refreshing-timeouts def main(): worker = hatchet.worker("timeout-worker", max_runs=4) From 129c54daa17acb64fcd627ca9a202f10519467f9 Mon Sep 17 00:00:00 2001 From: TranquilVarun <108307732+TranquilVarun@users.noreply.github.com> Date: Fri, 23 Aug 2024 19:38:41 -0700 Subject: [PATCH 07/13] Update worker.py --- examples/on_failure/worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/on_failure/worker.py b/examples/on_failure/worker.py index 804203a3..b60a8f3d 100644 --- a/examples/on_failure/worker.py +++ b/examples/on_failure/worker.py @@ -1,3 +1,4 @@ +#Python import json from dotenv import load_dotenv @@ -8,7 +9,7 @@ hatchet = Hatchet(debug=True) - +#START defining-an-on-failure-step @hatchet.workflow(on_events=["user:create"]) class OnFailureWorkflow: @hatchet.step(timeout="1s") @@ -23,7 +24,7 @@ def on_failure(self, context: Context): if len(failures) == 1 and "step1 failed" in failures[0]["error"]: return {"status": "success"} raise Exception("unexpected failure") - +#END defining-an-on-failure-step def main(): workflow = OnFailureWorkflow() From d1fbf10aa89db3eaff13df083f60fe1f884d4ae5 Mon Sep 17 00:00:00 2001 From: TranquilVarun <108307732+TranquilVarun@users.noreply.github.com> Date: Fri, 23 Aug 2024 19:49:42 -0700 Subject: [PATCH 08/13] Update stream.py --- examples/manual_trigger/stream.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/manual_trigger/stream.py b/examples/manual_trigger/stream.py index 07d54749..4136261f 100644 --- a/examples/manual_trigger/stream.py +++ b/examples/manual_trigger/stream.py @@ -1,3 +1,4 @@ +#Python import asyncio import base64 import json @@ -9,7 +10,7 @@ from hatchet_sdk.clients.admin import TriggerWorkflowOptions from hatchet_sdk.clients.run_event_listener import StepRunEventType - +#START listeners async def main(): load_dotenv() hatchet = new_client() @@ -49,7 +50,7 @@ async def main(): result = await workflowRun.result() print("result: " + json.dumps(result, indent=2)) - +#END listeners if __name__ == "__main__": asyncio.run(main()) From 1ab4fa9bb81deacce8ade04c9148b3793948b64f Mon Sep 17 00:00:00 2001 From: TranquilVarun <108307732+TranquilVarun@users.noreply.github.com> Date: Fri, 23 Aug 2024 19:58:40 -0700 Subject: [PATCH 09/13] Update stream.py --- examples/fanout/stream.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/fanout/stream.py b/examples/fanout/stream.py index ce7b5d52..ad8bfb09 100644 --- a/examples/fanout/stream.py +++ b/examples/fanout/stream.py @@ -1,3 +1,4 @@ +#Python import asyncio import base64 import json @@ -10,7 +11,7 @@ from hatchet_sdk.clients.admin import TriggerWorkflowOptions from hatchet_sdk.clients.run_event_listener import StepRunEventType - +#START streaming-by-additional-metadata async def main(): load_dotenv() hatchet = new_client() @@ -38,7 +39,7 @@ async def main(): async for event in listener: print(event.type, event.payload) - +#END streaming-by-additional-metadata if __name__ == "__main__": asyncio.run(main()) From 2dcc4f31c0d844c1e74908b97d275dd57f88c7f9 Mon Sep 17 00:00:00 2001 From: TranquilVarun <108307732+TranquilVarun@users.noreply.github.com> Date: Fri, 23 Aug 2024 20:12:53 -0700 Subject: [PATCH 10/13] Update worker.py --- examples/rate_limit/worker.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/examples/rate_limit/worker.py b/examples/rate_limit/worker.py index d773b606..925fe6c7 100644 --- a/examples/rate_limit/worker.py +++ b/examples/rate_limit/worker.py @@ -1,3 +1,4 @@ +#Python from dotenv import load_dotenv from hatchet_sdk import Context, Hatchet @@ -7,19 +8,19 @@ hatchet = Hatchet(debug=True) - +#START consuming-rate-limits @hatchet.workflow(on_events=["rate_limit:create"]) class RateLimitWorkflow: - @hatchet.step(rate_limits=[RateLimit(key="test-limit", units=1)]) def step1(self, context: Context): print("executed step1") pass - +#END consuming-rate-limits def main(): + #START declaring-global-limits hatchet.admin.put_rate_limit("test-limit", 2, RateLimitDuration.SECOND) - + #END declaring-global-limits worker = hatchet.worker("rate-limit-worker", max_runs=10) worker.register_workflow(RateLimitWorkflow()) From 4218942707d34d57a8db66cdae896114dabb9bd7 Mon Sep 17 00:00:00 2001 From: TranquilVarun <108307732+TranquilVarun@users.noreply.github.com> Date: Fri, 23 Aug 2024 20:23:55 -0700 Subject: [PATCH 11/13] Update worker.py --- examples/sticky-workers/worker.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/examples/sticky-workers/worker.py b/examples/sticky-workers/worker.py index 48921978..dddff283 100644 --- a/examples/sticky-workers/worker.py +++ b/examples/sticky-workers/worker.py @@ -1,3 +1,4 @@ +#Python from dotenv import load_dotenv from hatchet_sdk import Context, Hatchet, StickyStrategy @@ -6,7 +7,7 @@ hatchet = Hatchet(debug=True) - +#START setting-sticky-assignment @hatchet.workflow(on_events=["sticky:parent"], sticky=StickyStrategy.SOFT) class StickyWorkflow: @hatchet.step() @@ -27,14 +28,15 @@ async def step2(self, context: Context): await ref.result() return {"worker": context.worker.id()} +#END setting-sticky-assignment - +#START #sticky-child-workflows @hatchet.workflow(on_events=["sticky:child"], sticky=StickyStrategy.SOFT) class StickyChildWorkflow: @hatchet.step() def child(self, context: Context): return {"worker": context.worker.id()} - +#END sticky-child-workflows worker = hatchet.worker("sticky-worker", max_runs=10) worker.register_workflow(StickyWorkflow()) From b78005d0d4e7b70a8de0c16ee187d1544ad8630f Mon Sep 17 00:00:00 2001 From: TranquilVarun <108307732+TranquilVarun@users.noreply.github.com> Date: Fri, 23 Aug 2024 20:44:27 -0700 Subject: [PATCH 12/13] Update worker.py --- examples/affinity-workers/worker.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/examples/affinity-workers/worker.py b/examples/affinity-workers/worker.py index 7c280f02..310462ec 100644 --- a/examples/affinity-workers/worker.py +++ b/examples/affinity-workers/worker.py @@ -1,3 +1,4 @@ +#Python from dotenv import load_dotenv from hatchet_sdk import Context, Hatchet, WorkerLabelComparator @@ -6,7 +7,7 @@ hatchet = Hatchet(debug=True) - +#START specifying-step-desired-labels @hatchet.workflow(on_events=["affinity:run"]) class AffinityWorkflow: @hatchet.step( @@ -26,9 +27,11 @@ async def step(self, context: Context): context.worker.upsert_labels({"model": "fancy-ai-model-v2"}) return {"worker": context.worker.id()} +#END specifying-step-desired-labels def main(): + #START specifying-worker-labels worker = hatchet.worker( "affinity-worker", max_runs=10, @@ -37,6 +40,7 @@ def main(): "memory": 512, }, ) + #END specifying-worker-labels worker.register_workflow(AffinityWorkflow()) worker.start() From 378d4acefb58adc18e93bbf9f59e6fdc4a8e704b Mon Sep 17 00:00:00 2001 From: TranquilVarun <108307732+TranquilVarun@users.noreply.github.com> Date: Fri, 23 Aug 2024 20:52:02 -0700 Subject: [PATCH 13/13] Update worker.py --- examples/cancellation/worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/cancellation/worker.py b/examples/cancellation/worker.py index b495e05d..1d9c8c6d 100644 --- a/examples/cancellation/worker.py +++ b/examples/cancellation/worker.py @@ -1,3 +1,4 @@ +#Python import asyncio from dotenv import load_dotenv @@ -8,7 +9,7 @@ hatchet = Hatchet(debug=True) - +#START cancellation-mechanisms @hatchet.workflow(on_events=["user:create"]) class CancelWorkflow: @hatchet.step(timeout="10s", retries=1) @@ -21,7 +22,7 @@ async def step1(self, context: Context): if context.exit_flag: print("Cancelled") - +#END cancellation-mechanisms workflow = CancelWorkflow() worker = hatchet.worker("cancellation-worker", max_runs=4)