Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python Example Delimeters #161

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
6 changes: 5 additions & 1 deletion examples/affinity-workers/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#Python
from dotenv import load_dotenv

from hatchet_sdk import Context, Hatchet, WorkerLabelComparator
Expand All @@ -6,7 +7,7 @@

hatchet = Hatchet(debug=True)


#START specifying-step-desired-labels
@hatchet.workflow(on_events=["affinity:run"])
class AffinityWorkflow:
@hatchet.step(
Expand All @@ -26,9 +27,11 @@ async def step(self, context: Context):
context.worker.upsert_labels({"model": "fancy-ai-model-v2"})

return {"worker": context.worker.id()}
#END specifying-step-desired-labels


def main():
#START specifying-worker-labels
worker = hatchet.worker(
"affinity-worker",
max_runs=10,
Expand All @@ -37,6 +40,7 @@ def main():
"memory": 512,
},
)
#END specifying-worker-labels
worker.register_workflow(AffinityWorkflow())
worker.start()

Expand Down
5 changes: 3 additions & 2 deletions examples/cancellation/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#Python
import asyncio

from dotenv import load_dotenv
Expand All @@ -8,7 +9,7 @@

hatchet = Hatchet(debug=True)


#START cancellation-mechanisms
@hatchet.workflow(on_events=["user:create"])
class CancelWorkflow:
@hatchet.step(timeout="10s", retries=1)
Expand All @@ -21,7 +22,7 @@ async def step1(self, context: Context):

if context.exit_flag:
print("Cancelled")

#END cancellation-mechanisms

workflow = CancelWorkflow()
worker = hatchet.worker("cancellation-worker", max_runs=4)
Expand Down
7 changes: 5 additions & 2 deletions examples/concurrency_limit/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#Python
import time

from dotenv import load_dotenv
Expand All @@ -9,7 +10,7 @@

hatchet = Hatchet(debug=True)


#START concurrency_cancel_in_progress
@hatchet.workflow(on_events=["concurrency-test"])
class ConcurrencyDemoWorkflow:

Expand All @@ -23,11 +24,13 @@ def step1(self, context: Context):
time.sleep(3)
print("executed step1")
return {"run": input["run"]}

#END concurrency_cancel_in_progress

def main():
workflow = ConcurrencyDemoWorkflow()
#START setting-concurrency-on-workers
worker = hatchet.worker("concurrency-demo-worker", max_runs=10)
#END setting-concurrency-on-workers
worker.register_workflow(workflow)

worker.start()
Expand Down
5 changes: 3 additions & 2 deletions examples/concurrency_limit_rr/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#Python
import time

from dotenv import load_dotenv
Expand All @@ -8,7 +9,7 @@

hatchet = Hatchet(debug=True)


#START concurrency_group_red_robin
@hatchet.workflow(on_events=["concurrency-test"], schedule_timeout="10m")
class ConcurrencyDemoWorkflowRR:
@hatchet.concurrency(
Expand All @@ -25,7 +26,7 @@ def step1(self, context):
time.sleep(2)
print("finished step1")
pass

#END concurrency_group_red_robin

workflow = ConcurrencyDemoWorkflowRR()
worker = hatchet.worker("concurrency-demo-worker-rr", max_runs=10)
Expand Down
5 changes: 3 additions & 2 deletions examples/fanout/stream.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#Python
import asyncio
import base64
import json
Expand All @@ -10,7 +11,7 @@
from hatchet_sdk.clients.admin import TriggerWorkflowOptions
from hatchet_sdk.clients.run_event_listener import StepRunEventType


#START streaming-by-additional-metadata
async def main():
load_dotenv()
hatchet = new_client()
Expand Down Expand Up @@ -38,7 +39,7 @@ async def main():

async for event in listener:
print(event.type, event.payload)

#END streaming-by-additional-metadata

if __name__ == "__main__":
asyncio.run(main())
5 changes: 3 additions & 2 deletions examples/manual_trigger/stream.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#Python
import asyncio
import base64
import json
Expand All @@ -9,7 +10,7 @@
from hatchet_sdk.clients.admin import TriggerWorkflowOptions
from hatchet_sdk.clients.run_event_listener import StepRunEventType


#START listeners
async def main():
load_dotenv()
hatchet = new_client()
Expand Down Expand Up @@ -49,7 +50,7 @@ async def main():
result = await workflowRun.result()

print("result: " + json.dumps(result, indent=2))

#END listeners

if __name__ == "__main__":
asyncio.run(main())
5 changes: 3 additions & 2 deletions examples/on_failure/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#Python
import json

from dotenv import load_dotenv
Expand All @@ -8,7 +9,7 @@

hatchet = Hatchet(debug=True)


#START defining-an-on-failure-step
@hatchet.workflow(on_events=["user:create"])
class OnFailureWorkflow:
@hatchet.step(timeout="1s")
Expand All @@ -23,7 +24,7 @@ def on_failure(self, context: Context):
if len(failures) == 1 and "step1 failed" in failures[0]["error"]:
return {"status": "success"}
raise Exception("unexpected failure")

#END defining-an-on-failure-step

def main():
workflow = OnFailureWorkflow()
Expand Down
9 changes: 5 additions & 4 deletions examples/rate_limit/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#Python
from dotenv import load_dotenv

from hatchet_sdk import Context, Hatchet
Expand All @@ -7,19 +8,19 @@

hatchet = Hatchet(debug=True)


#START consuming-rate-limits
@hatchet.workflow(on_events=["rate_limit:create"])
class RateLimitWorkflow:

@hatchet.step(rate_limits=[RateLimit(key="test-limit", units=1)])
def step1(self, context: Context):
print("executed step1")
pass

#END consuming-rate-limits

def main():
#START declaring-global-limits
hatchet.admin.put_rate_limit("test-limit", 2, RateLimitDuration.SECOND)

#END declaring-global-limits
worker = hatchet.worker("rate-limit-worker", max_runs=10)
worker.register_workflow(RateLimitWorkflow())

Expand Down
8 changes: 5 additions & 3 deletions examples/simple/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#Python
import time

from dotenv import load_dotenv
Expand All @@ -8,7 +9,7 @@

hatchet = Hatchet(debug=True)


#START how-to-use-step-level-retries
@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
@hatchet.step(timeout="11s", retries=3)
Expand All @@ -19,14 +20,15 @@ def step1(self, context: Context):
return {
"step1": "step1",
}

#END how-to-use-step-level-retries

def main():
#START registering_workflows_starting_workers
workflow = MyWorkflow()
worker = hatchet.worker("test-worker", max_runs=1)
worker.register_workflow(workflow)
worker.start()

#END registering_workflows_starting_workers

if __name__ == "__main__":
main()
8 changes: 5 additions & 3 deletions examples/sticky-workers/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#Python
from dotenv import load_dotenv

from hatchet_sdk import Context, Hatchet, StickyStrategy
Expand All @@ -6,7 +7,7 @@

hatchet = Hatchet(debug=True)


#START setting-sticky-assignment
@hatchet.workflow(on_events=["sticky:parent"], sticky=StickyStrategy.SOFT)
class StickyWorkflow:
@hatchet.step()
Expand All @@ -27,14 +28,15 @@ async def step2(self, context: Context):
await ref.result()

return {"worker": context.worker.id()}
#END setting-sticky-assignment


#START #sticky-child-workflows
@hatchet.workflow(on_events=["sticky:child"], sticky=StickyStrategy.SOFT)
class StickyChildWorkflow:
@hatchet.step()
def child(self, context: Context):
return {"worker": context.worker.id()}

#END sticky-child-workflows

worker = hatchet.worker("sticky-worker", max_runs=10)
worker.register_workflow(StickyWorkflow())
Expand Down
12 changes: 7 additions & 5 deletions examples/timeout/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#Python
import time

from dotenv import load_dotenv
Expand All @@ -8,16 +9,17 @@

hatchet = Hatchet(debug=True)


#START scheduling-timeouts
@hatchet.workflow(on_events=["timeout:create"])
class TimeoutWorkflow:

#END scheduling-timeouts
#START step-timeouts
@hatchet.step(timeout="4s")
def step1(self, context: Context):
time.sleep(5)
return {"status": "success"}


#END step-timeouts
#START refreshing-timeouts
@hatchet.workflow(on_events=["refresh:create"])
class RefreshTimeoutWorkflow:

Expand All @@ -28,7 +30,7 @@ def step1(self, context: Context):
time.sleep(5)

return {"status": "success"}

#END refreshing-timeouts

def main():
worker = hatchet.worker("timeout-worker", max_runs=4)
Expand Down