Skip to content

Commit

Permalink
applying new Writer
Browse files Browse the repository at this point in the history
  • Loading branch information
j0nimost committed Oct 26, 2023
1 parent cac89c4 commit a422343
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 49 deletions.
28 changes: 15 additions & 13 deletions src/Kafa/Kafa.Stream.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using nyingi.Kafa.Reader;
using System.Buffers;
using nyingi.Kafa.Reader;
using nyingi.Kafa.Reflection;
using System.IO;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Xml;
using nyingi.Kafa.Writer;
using static nyingi.Kafa.Reader.KafaReader;

namespace nyingi.Kafa
Expand Down Expand Up @@ -76,25 +78,24 @@ private static async ValueTask<RowEnumerable> ReadProcessorAsync(KafaReadState k

return reader.GetRows();
}

public static async ValueTask<TextWriter> WriteAsync<T>(List<T> entities, KafaOptions options =null)
public static ReadOnlySpan<byte> Write<T>(List<T> entities, KafaOptions options = null)

Check warning on line 81 in src/Kafa/Kafa.Stream.cs

View workflow job for this annotation

GitHub Actions / build

Cannot convert null literal to non-nullable reference type.
{
ArgumentNullException.ThrowIfNull(entities, nameof(entities));
var reflection = SetupOptions<T>(options);
using var strWriter = new StringWriter(new StringBuilder(4096)); //4k chars
return await reflection.GetProperties<T>(entities, strWriter);
using var pooledBufferWriter = new KafaPooledWriter();
using var bufferWriter = new KafaWriter(pooledBufferWriter, reflection.TypeInfo.KafaOptions);
reflection.GetProperties<T>(bufferWriter, entities);
return pooledBufferWriter.WrittenAsSpan;
}

public static async ValueTask<MemoryStream> WriteToStreamAsync<T>(List<T> entities, KafaOptions options = null)
public static async ValueTask<Stream> WriteToStreamAsync<T>(List<T> entities, KafaOptions options = null)

Check warning on line 91 in src/Kafa/Kafa.Stream.cs

View workflow job for this annotation

GitHub Actions / build

Cannot convert null literal to non-nullable reference type.
{
ArgumentNullException.ThrowIfNull(entities, nameof(entities));
var reflection = SetupOptions<T>(options);
var memoryStream = new MemoryStream();
using var strWriter = new StreamWriter(memoryStream, leaveOpen: true);
var textStream = await reflection.GetProperties<T>(entities, strWriter);
textStream.Flush();
memoryStream.Seek(0, SeekOrigin.Begin);

using var bufferWriter = new KafaWriter(memoryStream, reflection.TypeInfo.KafaOptions);
reflection.GetProperties<T>(bufferWriter, entities);
await bufferWriter.FlushAsync();
return memoryStream;
}

Expand All @@ -104,8 +105,9 @@ public static async ValueTask WriteToFileAsync<T>(List<T> entities, string path,
ArgumentNullException.ThrowIfNull(path, nameof(path));
var reflection = SetupOptions<T>(options);
using var fs = new FileStream(path, FileMode.Create);
using var strWriter = new StreamWriter(fs, options.Encoding!, 512);
await reflection.GetProperties<T>(entities, strWriter);
using var bufferWriter = new KafaWriter(fs, reflection.TypeInfo.KafaOptions);
reflection.GetProperties<T>(bufferWriter, entities);
await bufferWriter.FlushAsync();
}

private static KafaReflection SetupOptions<T>(KafaOptions options)
Expand Down
19 changes: 9 additions & 10 deletions src/Kafa/Reflection/KafaReflection.Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
using System.Reflection;
using System.Reflection.Emit;
using System.Text;
using nyingi.Kafa.Writer;

namespace nyingi.Kafa.Reflection
{
internal partial class KafaReflection
{
public async Task<TextWriter> GetProperties<T>(List<T> entities, TextWriter textWriter)
public void GetProperties<T>(in KafaWriter writer, List<T> entities)
{

bool readHeader = false;
Expand All @@ -29,42 +30,40 @@ public async Task<TextWriter> GetProperties<T>(List<T> entities, TextWriter text

if (kafa != null && !string.IsNullOrEmpty(kafa.FieldName))
{
await textWriter.WriteAsync(kafa.FieldName);
writer.Write(kafa.FieldName);

}
else
{
await textWriter.WriteAsync(propertyInfo.Name);
writer.Write(propertyInfo.Name);
}

if (countHeader < propertyInfos.Length - 1)
{
await textWriter.WriteAsync((char)TypeInfo.KafaOptions.Separator);
writer.WriteSeparator();

}
countHeader++;
}

await textWriter.WriteLineAsync();
writer.WriteLine();
readHeader= true;
}

propertyCount = propertyInfos.Length;
foreach (var propertyInfo in propertyInfos)
{
await textWriter.WriteAsync($"{propertyInfo.GetValue(entity)}");
writer.Write($"{propertyInfo.GetValue(entity)}");

if (count < propertyCount - 1)
{
await textWriter.WriteAsync((char)TypeInfo.KafaOptions.Separator);
writer.WriteSeparator();
}
count++;
}
await textWriter.WriteLineAsync();
writer.WriteLine();
count = 0;
}

return textWriter;
}
}
}
3 changes: 1 addition & 2 deletions src/Kafa/Writer/KafaPooledWriter.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Buffers;
using System.Transactions;

namespace nyingi.Kafa.Writer
{
Expand All @@ -12,7 +11,7 @@ internal sealed class KafaPooledWriter : IBufferWriter<byte>, IDisposable

private int Capacity => _buffer.Length;
private int FreeCapacity => _buffer.Length - _index;
public KafaPooledWriter(int length)
public KafaPooledWriter(int length = 0)
{
_buffer = ArrayPool<byte>.Shared.Rent(Math.Max(length, DefaultBufferLength));
}
Expand Down
36 changes: 34 additions & 2 deletions src/Kafa/Writer/KafaWriter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Buffers;
using System.Runtime.InteropServices;
using System.Text;

namespace nyingi.Kafa.Writer
{
Expand All @@ -7,16 +9,46 @@ internal class KafaWriter : IDisposable
private IBufferWriter<byte>? _bufferWriter;
private KafaPooledWriter? _kafaPooledWriter;
private Stream? _stream = default;
public KafaWriter(IBufferWriter<byte> bufferWriter)

private readonly KafaOptions _options;


private byte[] _separator = new byte[1];
public KafaWriter(in IBufferWriter<byte> bufferWriter, KafaOptions options)
{
_bufferWriter = bufferWriter ?? throw new NullReferenceException(nameof(bufferWriter));
_options = options;
_separator[0] = (byte)_options.Separator;
}
public KafaWriter(Stream stream)
public KafaWriter(in Stream stream, KafaOptions options)
{
_stream = stream;
_options = options;
_kafaPooledWriter = new KafaPooledWriter(0); // use the default 65k
}

public void WriteSeparator()
{
Write(_separator.AsSpan());
}

public void WriteLine()
{
var newLine = new byte[2]
{
(byte)'\r',
(byte)'\n'
};
Write(newLine);

}

public void Write(string str)
{
var strBytes = Encoding.UTF8.GetBytes(str);
Write(strBytes.AsSpan());
}

public void Write(ReadOnlySpan<byte> values)
{
if (_bufferWriter != null)
Expand Down
32 changes: 10 additions & 22 deletions src/KafaTests/KafaWriteTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ public class KafaWriteTests
{

[Fact]
public async Task WriteCSVNoHeaderAsync()
public void WriteCSVNoHeader()
{
var csvs = new List<CsvData>()
{
new CsvData{ Date = DateTime.Parse("10/10/2023 4:09:45 PM"), Open=12.45, Close=12.99, High=13.00, Low=12.1, Name="AMZN", Volume=1233435512},
new CsvData{ Date = DateTime.Parse("10/10/2023 4:09:45 PM"), Open=12.45, Close=12.99, High=13.00, Low=12.1, Name="AMZN", Volume=1233435512}
};

var rowmem = await Kafa.WriteAsync<CsvData>(csvs, new KafaOptions() { HasHeader = false, Separator=SeparatorFileType.CSV});
var rowmem = Kafa.Write(csvs, new KafaOptions() { HasHeader = false, Separator=SeparatorFileType.CSV});
string expected = "";

if (Environment.OSVersion.Platform == PlatformID.Unix)
Expand All @@ -23,14 +23,15 @@ public async Task WriteCSVNoHeaderAsync()
{
expected = "10/10/2023 4:09:45 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n10/10/2023 4:09:45 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n";
}
var str = rowmem.ToString();

var str = Encoding.UTF8.GetString(rowmem);
Assert.NotNull(str);
Assert.NotEmpty(str);
Assert.Equal(expected, str);
}

[Fact]
public async Task WriteCSVWithDefaultHeaderAsync()
public void WriteCSVWithDefaultHeader()
{
var csvs = new List<CsvData>()
{
Expand All @@ -47,15 +48,15 @@ public async Task WriteCSVWithDefaultHeaderAsync()
{
expected = "Date,Open,High,Low,Close,Volume,Name\r\n10/10/2023 4:08:38 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n10/10/2023 4:08:38 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n";
}
var rowmem = await Kafa.WriteAsync<CsvData>(csvs);
var str = rowmem.ToString();
var rowmem = Kafa.Write<CsvData>(csvs);
var str = Encoding.UTF8.GetString(rowmem);
Assert.NotNull(str);
Assert.NotEmpty(str);
Assert.Equal(expected, str);
}

[Fact]
public async Task WriteCSVWithAttributeHeaderAsync()
public void WriteCSVWithAttributeHeader()
{
var csvs = new List<CSVDataWithAttributes>()
{
Expand All @@ -72,8 +73,8 @@ public async Task WriteCSVWithAttributeHeaderAsync()
{
expected = "date,Open,High,Low,Close,Volume,name\r\n10/10/2023 4:08:38 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n10/10/2023 4:08:38 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n";
}
var rowmem = await Kafa.WriteAsync<CSVDataWithAttributes>(csvs);
var str = rowmem.ToString();
var rowmem = Kafa.Write<CSVDataWithAttributes>(csvs);
var str = Encoding.UTF8.GetString(rowmem);
Assert.NotNull(str);
Assert.NotEmpty(str);
Assert.Equal(expected, str);
Expand All @@ -87,21 +88,8 @@ public async Task WriteCSVToStreamAsync()
new CsvData{ Date = DateTime.Parse("10/10/2023 4:08:38 PM"), Open=12.45, Close=12.99, High=13.00, Low=12.1, Name="AMZN", Volume=1233435512},
new CsvData{ Date = DateTime.Parse("10/10/2023 4:08:38 PM"), Open=12.45, Close=12.99, High=13.00, Low=12.1, Name="AMZN", Volume=1233435512}
};
string expected = "";

if (Environment.OSVersion.Platform == PlatformID.Unix)
{
expected = "Date,Open,High,Low,Close,Volume,Name\n10/10/2023 16:08:38,12.45,13,12.1,12.99,1233435512,AMZN\n10/10/2023 16:08:38,12.45,13,12.1,12.99,1233435512,AMZN\n";
}
else
{
expected = "Date,Open,High,Low,Close,Volume,Name\r\n10/10/2023 4:08:38 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n10/10/2023 4:08:38 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n";
}
using var stream = await Kafa.WriteToStreamAsync<CsvData>(csvs);
Assert.NotNull(stream);

var result = Encoding.UTF8.GetString(stream.GetBuffer(), 0, (int)stream.Length);
Assert.Equal(expected, result);
}
}
}

0 comments on commit a422343

Please sign in to comment.