Skip to content

Commit

Permalink
implemented writer
Browse files Browse the repository at this point in the history
  • Loading branch information
j0nimost committed Oct 25, 2023
1 parent 5241425 commit cac89c4
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 13 deletions.
7 changes: 4 additions & 3 deletions src/Kafa/Writer/KafaPooledWriter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Buffers;
using System.Transactions;

namespace nyingi.Kafa.Writer
{
Expand All @@ -9,14 +10,14 @@ internal sealed class KafaPooledWriter : IBufferWriter<byte>, IDisposable

private int _index;

public int WrittenCount => _index;
public int Capacity => _buffer.Length;
public int FreeCapacity => _buffer.Length - _index;
private int Capacity => _buffer.Length;
private int FreeCapacity => _buffer.Length - _index;
public KafaPooledWriter(int length)
{
_buffer = ArrayPool<byte>.Shared.Rent(Math.Max(length, DefaultBufferLength));
}
public ReadOnlySpan<byte> WrittenAsSpan => _buffer.AsSpan(0, _index);
public ReadOnlyMemory<byte> WrittenAsMemory => _buffer.AsMemory(0, _index);
public void Advance(int count)
{
if(count < 0)
Expand Down
48 changes: 38 additions & 10 deletions src/Kafa/Writer/KafaWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,58 @@

namespace nyingi.Kafa.Writer
{
internal partial class KafaWriter : IDisposable
internal class KafaWriter : IDisposable
{
private IBufferWriter<byte> _bufferWriter;
private KafaPooledWriter _kafaPooledWriter;
public int BytesWritten { get; private set; }
private IBufferWriter<byte>? _bufferWriter;
private KafaPooledWriter? _kafaPooledWriter;
private Stream? _stream = default;

public KafaWriter(IBufferWriter<byte> bufferWriter)
{
_bufferWriter = bufferWriter;
_bufferWriter = bufferWriter ?? throw new NullReferenceException(nameof(bufferWriter));
}

public KafaWriter(KafaPooledWriter pooledWriter, Stream stream)
public KafaWriter(Stream stream)
{
_stream = stream;
_kafaPooledWriter = pooledWriter; // use the backing bufferWriter to flusH
_kafaPooledWriter = new KafaPooledWriter(0); // use the default 65k
}

public void Write(ReadOnlySpan<byte> values)
{
if (_bufferWriter != null)
{
_bufferWriter.Write(values);
}
else
{
_kafaPooledWriter?.Write(values);
}
}

public void Flush()
{
if (_stream != null)
{
_stream.Write(_kafaPooledWriter!.WrittenAsSpan);
_stream.Flush();
}
}

public async ValueTask FlushAsync(CancellationToken cancellationToken=default)
{
if (_stream != null)
{
await _stream.WriteAsync(_kafaPooledWriter!.WrittenAsMemory, cancellationToken);
await _stream.FlushAsync();
}
}
public void Dispose()
{
_stream = null;
_kafaPooledWriter = null;
_bufferWriter = null;
if (_kafaPooledWriter != null)
{
_kafaPooledWriter.Dispose();
}
}
}
}

0 comments on commit cac89c4

Please sign in to comment.