Skip to content

Commit

Permalink
Merge pull request #58 from daichitakahashi/debugMarkLostDispatches
Browse files Browse the repository at this point in the history
chore: add debug log for markLostDispatches
  • Loading branch information
daichitakahashi authored Dec 2, 2024
2 parents ebc4f22 + be464f4 commit a04ac94
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .changeset/eighty-lobsters-burn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"cf-eventhub": patch
---

Add debug log for markLostDispatches
30 changes: 19 additions & 11 deletions cf-eventhub/src/core/hub/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ export class EventSink {
continuationToken?: string;
}): Promise<{ list: ResultedDispatch[]; continuationToken?: string }> {
const repo = this.repo;
const elapsedSeconds =
args?.elapsedSeconds !== undefined ? args.elapsedSeconds : 60 * 15; // Default value (15 min) is derived from duration limit of Queue Consumers.
const logger = this.logger;
const elapsedSeconds = args?.elapsedSeconds || 60 * 15; // Default value (15 min) is derived from duration limit of Queue Consumers.

const result = await repo.enterTransactionalScope(async (tx) =>
safeTry(async function* () {
Expand All @@ -205,24 +205,32 @@ export class EventSink {
)
).safeUnwrap();

const lostDispatches = listResult.list.filter((d) => {
const lostDispatches = listResult.list.flatMap((d) => {
if (d.status !== "ongoing") {
return [];
}
const lastTime =
d.executionLog.length > 0
? d.executionLog[d.executionLog.length - 1].executedAt
: d.createdAt;
const elapsed = Date.now() - lastTime.getTime();
const elapsed = (Date.now() - lastTime.getTime()) / 1000;

return elapsed > (d.delaySeconds || 0) + elapsedSeconds // elapsed from last execution or its creation.
? [makeDispatchLost(d, new Date())]
: [];
});

return elapsed > ((d.delaySeconds || 0) + elapsedSeconds) * 1000; // elapsed from last execution or its creation.
logger.debug("markLostDispatches", {
ongoingDispatches: listResult.list,
lostDispatches,
elapsedSeconds,
});

const result: ResultedDispatch[] = [];
for (const d of lostDispatches) {
const resulted = makeDispatchLost(d as OngoingDispatch, new Date());
result.push(resulted);
yield* (await tx.saveDispatch(resulted)).safeUnwrap();
for (const lost of lostDispatches) {
yield* (await tx.saveDispatch(lost)).safeUnwrap();
}
return ok({
list: result,
list: lostDispatches,
continuationToken: listResult.continuationToken,
});
}),
Expand Down

0 comments on commit a04ac94

Please sign in to comment.