Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: swallowed exceptions in timed out WithCircuitBreaker() #7362

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,19 @@ protected internal override bool AroundReceive(Receive receive, object message)
/// TBD
/// </summary>
/// <param name="messages">TBD</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to stop async operation</param>
/// <exception cref="TimeoutException">
/// This exception is thrown when the store has not been initialized.
/// </exception>
/// <returns>TBD</returns>
protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken = default)
{
var trueMsgs = messages.ToArray();

if (_store == null)
return StoreNotInitialized<IImmutableList<Exception>>();

return _store.Ask<object>(sender => new WriteMessages(trueMsgs, sender, 1), Timeout, CancellationToken.None)
return _store.Ask<object>(sender => new WriteMessages(trueMsgs, sender, 1), Timeout, cancellationToken)
.ContinueWith(r =>
{
if (r.IsCanceled)
Expand All @@ -195,18 +196,19 @@ protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerabl
/// </summary>
/// <param name="persistenceId">TBD</param>
/// <param name="toSequenceNr">TBD</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to stop async operation</param>
/// <exception cref="TimeoutException">
/// This exception is thrown when the store has not been initialized.
/// </exception>
/// <returns>TBD</returns>
protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken = default)
{
if (_store == null)
return StoreNotInitialized<object>();

var result = new TaskCompletionSource<object>();

_store.Ask<object>(sender => new DeleteMessagesTo(persistenceId, toSequenceNr, sender), Timeout, CancellationToken.None).ContinueWith(r =>
_store.Ask<object>(sender => new DeleteMessagesTo(persistenceId, toSequenceNr, sender), Timeout, cancellationToken).ContinueWith(r =>
{
if (r.IsFaulted)
result.TrySetException(r.Exception);
Expand Down Expand Up @@ -250,18 +252,19 @@ public override Task ReplayMessagesAsync(IActorContext context, string persisten
/// </summary>
/// <param name="persistenceId">TBD</param>
/// <param name="fromSequenceNr">TBD</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to stop async operation</param>
/// <exception cref="TimeoutException">
/// This exception is thrown when the store has not been initialized.
/// </exception>
/// <returns>TBD</returns>
public override Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr)
public override Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken = default)
{
if (_store == null)
return StoreNotInitialized<long>();

var result = new TaskCompletionSource<long>();

_store.Ask<object>(sender => new ReplayMessages(0, 0, 0, persistenceId, sender), Timeout, CancellationToken.None)
_store.Ask<object>(sender => new ReplayMessages(0, 0, 0, persistenceId, sender), Timeout, cancellationToken)
.ContinueWith(t =>
{
if (t.IsFaulted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
Expand Down Expand Up @@ -89,14 +90,14 @@ protected internal override bool AroundReceive(Receive receive, object message)
return true;
}

protected async override Task DeleteAsync(SnapshotMetadata metadata)
protected override async Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken = default)
{
if (_store == null)
throw new TimeoutException("Store not intialized.");
var s = Sender;
try
{
var response = await _store.Ask(new DeleteSnapshot(metadata), Timeout);
var response = await _store.Ask(new DeleteSnapshot(metadata), Timeout, cancellationToken);
if (response is DeleteSnapshotFailure f)
{
ExceptionDispatchInfo.Capture(f.Cause).Throw();
Expand All @@ -108,14 +109,14 @@ protected async override Task DeleteAsync(SnapshotMetadata metadata)
}
}

protected async override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default)
{
if (_store == null)
throw new TimeoutException("Store not intialized.");
var s = Sender;
try
{
var response = await _store.Ask(new DeleteSnapshots(persistenceId, criteria), Timeout);
var response = await _store.Ask(new DeleteSnapshots(persistenceId, criteria), Timeout, cancellationToken);
if (response is DeleteSnapshotsFailure f)
{
ExceptionDispatchInfo.Capture(f.Cause).Throw();
Expand All @@ -127,14 +128,14 @@ protected async override Task DeleteAsync(string persistenceId, SnapshotSelectio
}
}

protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default)
{
if (_store == null)
throw new TimeoutException("Store not intialized.");
var s = Sender;
try
{
var response = await _store.Ask(new LoadSnapshot(persistenceId, criteria, criteria.MaxSequenceNr), Timeout);
var response = await _store.Ask(new LoadSnapshot(persistenceId, criteria, criteria.MaxSequenceNr), Timeout, cancellationToken);
switch (response)
{
case LoadSnapshotResult ls:
Expand All @@ -154,14 +155,14 @@ protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId,
throw new TimeoutException();
}

protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot)
protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot, CancellationToken cancellationToken = default)
{
if (_store == null)
throw new TimeoutException("Store not intialized.");
var s = Sender;
try
{
var response = await _store.Ask(new SaveSnapshot(metadata, snapshot), Timeout);
var response = await _store.Ask(new SaveSnapshot(metadata, snapshot), Timeout, cancellationToken);
if (response is SaveSnapshotFailure f)
{
ExceptionDispatchInfo.Capture(f.Cause).Throw();
Expand Down
Loading
Loading