Skip to content

Commit

Permalink
fix: ensure catchError functions always return source iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
jeengbe committed Jul 19, 2024
1 parent b8890f1 commit acd8fbd
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 58 deletions.
2 changes: 1 addition & 1 deletion docs/asynciterable/creating.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ let value, done;

## Brief Interlude - `AsyncSink`

Very rarely will we ever need to create these async-iterables by hand, however, if you need a collection that you can add to as well as iterate, we have the `AsyncSink` class. This class serves as a basis for some of our operators such as binding to events and DOM and Node.js streams.
Very rarely will we ever need to create these async-iterables by hand, however, if you need a collection that you can add to as well as iterate, we have the `AsyncSink` class. This class serves as a basis for some of our operators such as binding to events and DOM and Node.js streams.

```typescript
import { AsyncSink } from 'ix/asynciterable';
Expand Down
3 changes: 1 addition & 2 deletions docs/asynciterable/transforming.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,5 @@ await subscription.pipe(
.forEach(handleBatch)
```

Using this operator makes sure that if messages slow down you'll still
handle them in a reasonable time whereas using `buffer` would leave you stuck until you get
Using this operator makes sure that if messages slow down you'll still handle them in a reasonable time whereas using `buffer` would leave you stuck until you get
the right amount of messages.
30 changes: 16 additions & 14 deletions src/asynciterable/catcherror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,26 @@ export class CatchAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
error = null;
hasError = false;

while (1) {
let c = <TSource>{};
try {
while (1) {
let c = <TSource>{};

try {
const { done, value } = await it.next();
if (done) {
await returnAsyncIterator(it);
try {
const { done, value } = await it.next();
if (done) {
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
await returnAsyncIterator(it);
break;
}

yield c;
yield c;
}
} finally {
await returnAsyncIterator(it);
}

if (!hasError) {
Expand Down
29 changes: 16 additions & 13 deletions src/asynciterable/operators/catcherror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,26 @@ export class CatchWithAsyncIterable<TSource, TResult> extends AsyncIterableX<TSo
let hasError = false;
const source = wrapWithAbort(this._source, signal);
const it = source[Symbol.asyncIterator]();
while (1) {
let c = <IteratorResult<TSource>>{};

try {
c = await it.next();
if (c.done) {
await returnAsyncIterator(it);
try {
while (1) {
let c = <IteratorResult<TSource>>{};

try {
c = await it.next();
if (c.done) {
break;
}
} catch (e) {
err = await this._handler(e, signal);
hasError = true;
break;
}
} catch (e) {
err = await this._handler(e, signal);
hasError = true;
await returnAsyncIterator(it);
break;
}

yield c.value;
yield c.value;
}
} finally {
await returnAsyncIterator(it);
}

if (hasError) {
Expand Down
30 changes: 16 additions & 14 deletions src/iterable/catcherror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,26 @@ export class CatchIterable<TSource> extends IterableX<TSource> {
error = null;
hasError = false;

while (1) {
let c = <TSource>{};
try {
while (1) {
let c = <TSource>{};

try {
const { done, value } = it.next();
if (done) {
returnIterator(it);
try {
const { done, value } = it.next();
if (done) {
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
returnIterator(it);
break;
}

yield c;
yield c;
}
} finally {
returnIterator(it);
}

if (!hasError) {
Expand Down
31 changes: 17 additions & 14 deletions src/iterable/operators/catcherror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,27 @@ export class CatchWithIterable<TSource, TResult> extends IterableX<TSource | TRe
let err: Iterable<TResult> | undefined;
let hasError = false;
const it = this._source[Symbol.iterator]();
while (1) {
let done: boolean | undefined;
let value: TSource;

try {
({ done, value } = it.next());
if (done) {
returnIterator(it);
try {
while (1) {
let done: boolean | undefined;
let value: TSource;

try {
({ done, value } = it.next());
if (done) {
break;
}
} catch (e) {
err = this._handler(e);
hasError = true;
break;
}
} catch (e) {
err = this._handler(e);
hasError = true;
returnIterator(it);
break;
}

yield value;
yield value;
}
} finally {
returnIterator(it);
}

if (hasError) {
Expand Down

0 comments on commit acd8fbd

Please sign in to comment.