From a422343edf3259958cdecec89a50c9b54aa9b986 Mon Sep 17 00:00:00 2001 From: j0nimost Date: Thu, 26 Oct 2023 08:20:01 +0300 Subject: [PATCH] applying new Writer --- src/Kafa/Kafa.Stream.cs | 28 ++++++++------- src/Kafa/Reflection/KafaReflection.Reader.cs | 19 +++++------ src/Kafa/Writer/KafaPooledWriter.cs | 3 +- src/Kafa/Writer/KafaWriter.cs | 36 ++++++++++++++++++-- src/KafaTests/KafaWriteTests.cs | 32 ++++++----------- 5 files changed, 69 insertions(+), 49 deletions(-) diff --git a/src/Kafa/Kafa.Stream.cs b/src/Kafa/Kafa.Stream.cs index 8245f71..024c1d8 100644 --- a/src/Kafa/Kafa.Stream.cs +++ b/src/Kafa/Kafa.Stream.cs @@ -1,10 +1,12 @@ -using nyingi.Kafa.Reader; +using System.Buffers; +using nyingi.Kafa.Reader; using nyingi.Kafa.Reflection; using System.IO; using System.Runtime.Serialization; using System.Text; using System.Threading; using System.Xml; +using nyingi.Kafa.Writer; using static nyingi.Kafa.Reader.KafaReader; namespace nyingi.Kafa @@ -76,25 +78,24 @@ private static async ValueTask ReadProcessorAsync(KafaReadState k return reader.GetRows(); } - - public static async ValueTask WriteAsync(List entities, KafaOptions options =null) + public static ReadOnlySpan Write(List entities, KafaOptions options = null) { ArgumentNullException.ThrowIfNull(entities, nameof(entities)); var reflection = SetupOptions(options); - using var strWriter = new StringWriter(new StringBuilder(4096)); //4k chars - return await reflection.GetProperties(entities, strWriter); + using var pooledBufferWriter = new KafaPooledWriter(); + using var bufferWriter = new KafaWriter(pooledBufferWriter, reflection.TypeInfo.KafaOptions); + reflection.GetProperties(bufferWriter, entities); + return pooledBufferWriter.WrittenAsSpan; } - public static async ValueTask WriteToStreamAsync(List entities, KafaOptions options = null) + public static async ValueTask WriteToStreamAsync(List entities, KafaOptions options = null) { ArgumentNullException.ThrowIfNull(entities, nameof(entities)); var reflection = SetupOptions(options); var memoryStream = new MemoryStream(); - using var strWriter = new StreamWriter(memoryStream, leaveOpen: true); - var textStream = await reflection.GetProperties(entities, strWriter); - textStream.Flush(); - memoryStream.Seek(0, SeekOrigin.Begin); - + using var bufferWriter = new KafaWriter(memoryStream, reflection.TypeInfo.KafaOptions); + reflection.GetProperties(bufferWriter, entities); + await bufferWriter.FlushAsync(); return memoryStream; } @@ -104,8 +105,9 @@ public static async ValueTask WriteToFileAsync(List entities, string path, ArgumentNullException.ThrowIfNull(path, nameof(path)); var reflection = SetupOptions(options); using var fs = new FileStream(path, FileMode.Create); - using var strWriter = new StreamWriter(fs, options.Encoding!, 512); - await reflection.GetProperties(entities, strWriter); + using var bufferWriter = new KafaWriter(fs, reflection.TypeInfo.KafaOptions); + reflection.GetProperties(bufferWriter, entities); + await bufferWriter.FlushAsync(); } private static KafaReflection SetupOptions(KafaOptions options) diff --git a/src/Kafa/Reflection/KafaReflection.Reader.cs b/src/Kafa/Reflection/KafaReflection.Reader.cs index c74ef79..dc954bf 100644 --- a/src/Kafa/Reflection/KafaReflection.Reader.cs +++ b/src/Kafa/Reflection/KafaReflection.Reader.cs @@ -3,12 +3,13 @@ using System.Reflection; using System.Reflection.Emit; using System.Text; +using nyingi.Kafa.Writer; namespace nyingi.Kafa.Reflection { internal partial class KafaReflection { - public async Task GetProperties(List entities, TextWriter textWriter) + public void GetProperties(in KafaWriter writer, List entities) { bool readHeader = false; @@ -29,42 +30,40 @@ public async Task GetProperties(List entities, TextWriter text if (kafa != null && !string.IsNullOrEmpty(kafa.FieldName)) { - await textWriter.WriteAsync(kafa.FieldName); + writer.Write(kafa.FieldName); } else { - await textWriter.WriteAsync(propertyInfo.Name); + writer.Write(propertyInfo.Name); } if (countHeader < propertyInfos.Length - 1) { - await textWriter.WriteAsync((char)TypeInfo.KafaOptions.Separator); + writer.WriteSeparator(); } countHeader++; } - await textWriter.WriteLineAsync(); + writer.WriteLine(); readHeader= true; } propertyCount = propertyInfos.Length; foreach (var propertyInfo in propertyInfos) { - await textWriter.WriteAsync($"{propertyInfo.GetValue(entity)}"); + writer.Write($"{propertyInfo.GetValue(entity)}"); if (count < propertyCount - 1) { - await textWriter.WriteAsync((char)TypeInfo.KafaOptions.Separator); + writer.WriteSeparator(); } count++; } - await textWriter.WriteLineAsync(); + writer.WriteLine(); count = 0; } - - return textWriter; } } } diff --git a/src/Kafa/Writer/KafaPooledWriter.cs b/src/Kafa/Writer/KafaPooledWriter.cs index bf6098f..99c8a36 100644 --- a/src/Kafa/Writer/KafaPooledWriter.cs +++ b/src/Kafa/Writer/KafaPooledWriter.cs @@ -1,5 +1,4 @@ using System.Buffers; -using System.Transactions; namespace nyingi.Kafa.Writer { @@ -12,7 +11,7 @@ internal sealed class KafaPooledWriter : IBufferWriter, IDisposable private int Capacity => _buffer.Length; private int FreeCapacity => _buffer.Length - _index; - public KafaPooledWriter(int length) + public KafaPooledWriter(int length = 0) { _buffer = ArrayPool.Shared.Rent(Math.Max(length, DefaultBufferLength)); } diff --git a/src/Kafa/Writer/KafaWriter.cs b/src/Kafa/Writer/KafaWriter.cs index 8181ad3..d208384 100644 --- a/src/Kafa/Writer/KafaWriter.cs +++ b/src/Kafa/Writer/KafaWriter.cs @@ -1,4 +1,6 @@ using System.Buffers; +using System.Runtime.InteropServices; +using System.Text; namespace nyingi.Kafa.Writer { @@ -7,16 +9,46 @@ internal class KafaWriter : IDisposable private IBufferWriter? _bufferWriter; private KafaPooledWriter? _kafaPooledWriter; private Stream? _stream = default; - public KafaWriter(IBufferWriter bufferWriter) + + private readonly KafaOptions _options; + + + private byte[] _separator = new byte[1]; + public KafaWriter(in IBufferWriter bufferWriter, KafaOptions options) { _bufferWriter = bufferWriter ?? throw new NullReferenceException(nameof(bufferWriter)); + _options = options; + _separator[0] = (byte)_options.Separator; } - public KafaWriter(Stream stream) + public KafaWriter(in Stream stream, KafaOptions options) { _stream = stream; + _options = options; _kafaPooledWriter = new KafaPooledWriter(0); // use the default 65k } + public void WriteSeparator() + { + Write(_separator.AsSpan()); + } + + public void WriteLine() + { + var newLine = new byte[2] + { + (byte)'\r', + (byte)'\n' + }; + Write(newLine); + + } + + public void Write(string str) + { + var strBytes = Encoding.UTF8.GetBytes(str); + Write(strBytes.AsSpan()); + } + public void Write(ReadOnlySpan values) { if (_bufferWriter != null) diff --git a/src/KafaTests/KafaWriteTests.cs b/src/KafaTests/KafaWriteTests.cs index 3d6011e..0d0754f 100644 --- a/src/KafaTests/KafaWriteTests.cs +++ b/src/KafaTests/KafaWriteTests.cs @@ -4,7 +4,7 @@ public class KafaWriteTests { [Fact] - public async Task WriteCSVNoHeaderAsync() + public void WriteCSVNoHeader() { var csvs = new List() { @@ -12,7 +12,7 @@ public async Task WriteCSVNoHeaderAsync() new CsvData{ Date = DateTime.Parse("10/10/2023 4:09:45 PM"), Open=12.45, Close=12.99, High=13.00, Low=12.1, Name="AMZN", Volume=1233435512} }; - var rowmem = await Kafa.WriteAsync(csvs, new KafaOptions() { HasHeader = false, Separator=SeparatorFileType.CSV}); + var rowmem = Kafa.Write(csvs, new KafaOptions() { HasHeader = false, Separator=SeparatorFileType.CSV}); string expected = ""; if (Environment.OSVersion.Platform == PlatformID.Unix) @@ -23,14 +23,15 @@ public async Task WriteCSVNoHeaderAsync() { expected = "10/10/2023 4:09:45 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n10/10/2023 4:09:45 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n"; } - var str = rowmem.ToString(); + + var str = Encoding.UTF8.GetString(rowmem); Assert.NotNull(str); Assert.NotEmpty(str); Assert.Equal(expected, str); } [Fact] - public async Task WriteCSVWithDefaultHeaderAsync() + public void WriteCSVWithDefaultHeader() { var csvs = new List() { @@ -47,15 +48,15 @@ public async Task WriteCSVWithDefaultHeaderAsync() { expected = "Date,Open,High,Low,Close,Volume,Name\r\n10/10/2023 4:08:38 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n10/10/2023 4:08:38 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n"; } - var rowmem = await Kafa.WriteAsync(csvs); - var str = rowmem.ToString(); + var rowmem = Kafa.Write(csvs); + var str = Encoding.UTF8.GetString(rowmem); Assert.NotNull(str); Assert.NotEmpty(str); Assert.Equal(expected, str); } [Fact] - public async Task WriteCSVWithAttributeHeaderAsync() + public void WriteCSVWithAttributeHeader() { var csvs = new List() { @@ -72,8 +73,8 @@ public async Task WriteCSVWithAttributeHeaderAsync() { expected = "date,Open,High,Low,Close,Volume,name\r\n10/10/2023 4:08:38 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n10/10/2023 4:08:38 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n"; } - var rowmem = await Kafa.WriteAsync(csvs); - var str = rowmem.ToString(); + var rowmem = Kafa.Write(csvs); + var str = Encoding.UTF8.GetString(rowmem); Assert.NotNull(str); Assert.NotEmpty(str); Assert.Equal(expected, str); @@ -87,21 +88,8 @@ public async Task WriteCSVToStreamAsync() new CsvData{ Date = DateTime.Parse("10/10/2023 4:08:38 PM"), Open=12.45, Close=12.99, High=13.00, Low=12.1, Name="AMZN", Volume=1233435512}, new CsvData{ Date = DateTime.Parse("10/10/2023 4:08:38 PM"), Open=12.45, Close=12.99, High=13.00, Low=12.1, Name="AMZN", Volume=1233435512} }; - string expected = ""; - - if (Environment.OSVersion.Platform == PlatformID.Unix) - { - expected = "Date,Open,High,Low,Close,Volume,Name\n10/10/2023 16:08:38,12.45,13,12.1,12.99,1233435512,AMZN\n10/10/2023 16:08:38,12.45,13,12.1,12.99,1233435512,AMZN\n"; - } - else - { - expected = "Date,Open,High,Low,Close,Volume,Name\r\n10/10/2023 4:08:38 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n10/10/2023 4:08:38 PM,12.45,13,12.1,12.99,1233435512,AMZN\r\n"; - } using var stream = await Kafa.WriteToStreamAsync(csvs); Assert.NotNull(stream); - - var result = Encoding.UTF8.GetString(stream.GetBuffer(), 0, (int)stream.Length); - Assert.Equal(expected, result); } } }