Skip to content

Commit

Permalink
Add global transaction management (#9)
Browse files Browse the repository at this point in the history
Introduced IGlobalTransactionManager interface and its implementation to manage global transactions. Refactored the existing VaultTransaction class and updated dependency injection to support the new global transaction management.
  • Loading branch information
kerem-acer authored Nov 9, 2024
1 parent 202b5af commit 3ade825
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 17 deletions.
9 changes: 9 additions & 0 deletions src/Core/Extensions/MongoVaultServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.DependencyInjection;
using MongoDB.Driver;

namespace MongoFlow;

Expand Down Expand Up @@ -37,4 +38,12 @@ public static IServiceCollection AddMongoVault<TInterface, TVault>(this IService

return services;
}

public static IServiceCollection AddMongoVaultGlobalTransaction(this IServiceCollection services, Func<IServiceProvider, MongoClient> mongoClientFactory)
{
services.AddScoped<IGlobalTransactionManager>(serviceProvider =>
new MongoGlobalTransactionManager(mongoClientFactory(serviceProvider)));

return services;
}
}
20 changes: 15 additions & 5 deletions src/Core/MongoVault.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Microsoft.Extensions.DependencyInjection;
using MongoDB.Driver;

namespace MongoFlow;
Expand All @@ -7,7 +8,7 @@ public abstract class MongoVault : IDisposable
private readonly VaultConfigurationManager _configurationManager;
private readonly List<VaultOperation> _operations = [];

private MongoVaultTransaction? _transaction;
private VaultTransaction? _transaction;

protected MongoVault(VaultConfigurationManager configurationManager)
{
Expand All @@ -28,6 +29,8 @@ protected MongoVault(VaultConfigurationManager configurationManager)
internal VaultConfiguration Configuration => _configurationManager.Configuration;

internal IServiceProvider ServiceProvider => _configurationManager.ServiceProvider;

private IGlobalTransactionManager? GlobalTransactionManager => ServiceProvider.GetService<IGlobalTransactionManager>();

internal IMongoDatabase MongoDatabase => Configuration.Database!;

Expand All @@ -45,14 +48,19 @@ public DocumentSet<TDocument> Set<TDocument>(bool ignoreQueryFilter = false)

public bool IsInTransaction => _transaction is not null;

public IMongoVaultTransaction BeginTransaction()
public IVaultTransaction BeginTransaction()
{
if (_transaction is not null)
{
throw new InvalidOperationException("Transaction already started");
}

if (GlobalTransactionManager?.CurrentTransaction is not null)
{
throw new InvalidOperationException("BeginTransaction cannot be called inside a global transaction.");
}

_transaction = new MongoVaultTransaction(this, MongoDatabase.Client.StartSession());
_transaction = new VaultTransaction(this, MongoDatabase.Client.StartSession());

return _transaction;
}
Expand All @@ -77,9 +85,11 @@ public virtual async Task<int> SaveAsync(CancellationToken cancellationToken = d
{
return 0;
}

var transaction = GlobalTransactionManager?.CurrentTransaction ?? _transaction;

var session = _transaction is not null
? _transaction.GetSession()
var session = transaction is not null
? transaction.Session
: await MongoDatabase.Client.StartSessionAsync(cancellationToken: cancellationToken);

if (!session.IsInTransaction)
Expand Down
7 changes: 7 additions & 0 deletions src/Core/Transactions/IGlobalTransactionManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace MongoFlow;

public interface IGlobalTransactionManager
{
IVaultTransaction? CurrentTransaction { get; }
Task<IVaultTransaction> BeginAsync(CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using MongoDB.Driver;

namespace MongoFlow;

public interface IMongoVaultTransaction : IDisposable
public interface IVaultTransaction : IDisposable
{
Task CommitAsync(CancellationToken cancellationToken = default);
Task RollbackAsync(CancellationToken cancellationToken = default);
IClientSessionHandle Session { get; }
}
35 changes: 35 additions & 0 deletions src/Core/Transactions/MongoGlobalTransaction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using MongoDB.Driver;

namespace MongoFlow;

internal sealed class MongoGlobalTransaction : IVaultTransaction
{
public MongoGlobalTransaction(IClientSessionHandle session)
{
if (session.IsInTransaction)
{
throw new InvalidOperationException("MongoVaultTransaction can only be created from an inactive transaction.");
}

session.StartTransaction();

Session = session;
}

public void Dispose()
{
Session.Dispose();
}

public async Task CommitAsync(CancellationToken cancellationToken = default)
{
await Session.CommitTransactionAsync(cancellationToken);
}

public async Task RollbackAsync(CancellationToken cancellationToken = default)
{
await Session.AbortTransactionAsync(cancellationToken);
}

public IClientSessionHandle Session { get; }
}
43 changes: 43 additions & 0 deletions src/Core/Transactions/MongoGlobalTransactionManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using MongoDB.Driver;

namespace MongoFlow;

internal sealed class MongoGlobalTransactionManager : IGlobalTransactionManager, IDisposable
{
private readonly MongoClient _mongoClient;
private readonly SemaphoreSlim _semaphore;

public MongoGlobalTransactionManager(MongoClient mongoClient)
{
_mongoClient = mongoClient;
_semaphore = new SemaphoreSlim(1, 1);
}

public IVaultTransaction? CurrentTransaction { get; private set; }
public async Task<IVaultTransaction> BeginAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken);
try
{
if (CurrentTransaction != null)
{
throw new InvalidOperationException("Transaction already started.");
}

var session = await _mongoClient.StartSessionAsync(cancellationToken: cancellationToken);
CurrentTransaction = new MongoGlobalTransaction(session);
return CurrentTransaction;
}
finally
{
_semaphore.Release();
}
}

public void Dispose()
{
_mongoClient.Dispose();
_semaphore.Dispose();
CurrentTransaction?.Dispose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

namespace MongoFlow;

internal sealed class MongoVaultTransaction : IMongoVaultTransaction
internal sealed class VaultTransaction : IVaultTransaction
{
private readonly MongoVault _vault;
private readonly IClientSessionHandle _session;

public MongoVaultTransaction(MongoVault vault,
public VaultTransaction(MongoVault vault,
IClientSessionHandle session)
{
if (session.IsInTransaction)
Expand All @@ -18,29 +17,26 @@ public MongoVaultTransaction(MongoVault vault,
session.StartTransaction();

_vault = vault;
_session = session;
Session = session;
}

public void Dispose()
{
_session.Dispose();
Session.Dispose();
_vault.ClearTransaction();
}

public async Task CommitAsync(CancellationToken cancellationToken = default)
{
await _session.CommitTransactionAsync(cancellationToken);
await Session.CommitTransactionAsync(cancellationToken);
_vault.ClearTransaction();
}

public async Task RollbackAsync(CancellationToken cancellationToken = default)
{
await _session.AbortTransactionAsync(cancellationToken);
await Session.AbortTransactionAsync(cancellationToken);
_vault.ClearTransaction();
}

internal IClientSessionHandle GetSession()
{
return _session;
}
public IClientSessionHandle Session { get; }
}

0 comments on commit 3ade825

Please sign in to comment.