From d58be4dd7b7e2c42f865795d11207f051a07d4a9 Mon Sep 17 00:00:00 2001 From: Stefan Zeiger Date: Tue, 26 Nov 2024 21:04:43 +0100 Subject: [PATCH] PipeBufferedOutput --- README.md | 1 + .../main/scala/perfio/BenchmarkDataSet.scala | 185 ++++++++++++++++++ .../perfio/BufferedOutputNumBenchmark.scala | 92 ++------- .../scala/perfio/OutputToInputBenchmark.scala | 93 ++++----- .../main/java/perfio/BlockBufferedOutput.java | 16 +- core/src/main/java/perfio/BufferIterator.java | 54 ++--- core/src/main/java/perfio/BufferedOutput.java | 33 +++- .../main/java/perfio/PipeBufferedOutput.java | 140 +++++++++++++ .../perfio/SwitchingHeapBufferedInput.java | 30 ++- .../scala/perfio/BufferedOutputTest.scala | 1 + test/src/test/scala/perfio/TestUtil.scala | 10 + 11 files changed, 481 insertions(+), 174 deletions(-) create mode 100644 bench/src/main/scala/perfio/BenchmarkDataSet.scala create mode 100644 core/src/main/java/perfio/PipeBufferedOutput.java diff --git a/README.md b/README.md index 7ec8982..1d7841e 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ perfIO provides buffered streaming I/O abstractions for both binary and text dat | BufferedInput | BufferedInputStream, ByteArrayInputStream, DataInputStream, LimitedInputStream* | | BufferedOutput | BufferedOutputStream, DataOutputStream | | AccumulatingBufferedOutput | - | +| PipeBufferedOutput | PipeOutputStream | | ArrayBufferedOutput | ByteArrayOutputStream | | LineTokenizer | BufferedReader + InputStreamReader | | TextOutput | PrintWriter + BufferedWriter + OutputStreamWriter | diff --git a/bench/src/main/scala/perfio/BenchmarkDataSet.scala b/bench/src/main/scala/perfio/BenchmarkDataSet.scala new file mode 100644 index 0000000..8dd12e2 --- /dev/null +++ b/bench/src/main/scala/perfio/BenchmarkDataSet.scala @@ -0,0 +1,185 @@ +package perfio + +import com.esotericsoftware.kryo.io.Output +import org.openjdk.jmh.infra.Blackhole + +import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStream} +import java.nio.ByteBuffer + +abstract class BenchmarkDataSet: + def byteSize: Int + def writeTo(out: OutputStream): Unit + def writeTo(out: BufferedOutput): Unit + def writeTo(out: Output): Unit // Kryo: little endian (safe) or native endian (unsafe) + def writeTo(out: ByteBuffer): Unit + def readFrom(bh: Blackhole, bin: BufferedInput): Unit + def readFrom(bh: Blackhole, in: InputStream): Unit + +object BenchmarkDataSet: + def forName(s: String): BenchmarkDataSet = s match + case "num" => NumDataSet + case "bytes" => BytesDataSet + case "chunks" => ChunksDataSet + + +object NumDataSet extends BenchmarkDataSet: + val count = 20000000 + val byteSize = count * 13 + + def writeTo(out: OutputStream): Unit = + val dout = new DataOutputStream(out) + var i = 0 + while i < count do + dout.writeByte(i) + dout.writeInt(i+100) + dout.writeLong(i+101) + i += 1 + dout.close() + + def writeTo(out: Output): Unit = + var i = 0 + while i < count do + out.writeByte(i) + out.writeInt(i+100) + out.writeLong(i+101) + i += 1 + out.close() + + def writeTo(out: ByteBuffer): Unit = + var i = 0 + while i < count do + out.put(i.toByte) + out.putInt(i+100) + out.putLong(i+101) + i += 1 + + def writeTo(out: BufferedOutput): Unit = + var i = 0 + while i < count do + out.int8(i.toByte) + out.int32b(i+100) + out.int64b(i+101) + i += 1 + out.close() + + def readFrom(bh: Blackhole, bin: BufferedInput): Unit = { + var i = 0 + while i < count do + bh.consume(bin.int8()) + bh.consume(bin.int32b()) + bh.consume(bin.int64b()) + i += 1 + bin.close() + } + + def readFrom(bh: Blackhole, in: InputStream): Unit = { + val din = new DataInputStream(in) + var i = 0 + while i < count do + bh.consume(din.readByte()) + bh.consume(din.readInt()) + bh.consume(din.readLong()) + i += 1 + din.close() + } + + +object BytesDataSet extends BenchmarkDataSet: + val count = 100000000 + val byteSize = count + + def writeTo(out: OutputStream): Unit = + var i = 0 + while i < count do + out.write(i.toByte) + i += 1 + out.close() + + def writeTo(out: Output): Unit = + var i = 0 + while i < count do + out.writeByte(i.toByte) + i += 1 + out.close() + + def writeTo(out: ByteBuffer): Unit = + var i = 0 + while i < count do + out.put(i.toByte) + i += 1 + + def writeTo(out: BufferedOutput): Unit = + var i = 0 + while i < count do + out.int8(i.toByte) + i += 1 + out.close() + + def readFrom(bh: Blackhole, bin: BufferedInput): Unit = { + var i = 0 + while i < count do + bh.consume(bin.int8()) + i += 1 + bin.close() + } + + def readFrom(bh: Blackhole, in: InputStream): Unit = { + var i = 0 + while i < count do + bh.consume(in.read().toByte) + i += 1 + in.close() + } + + +object ChunksDataSet extends BenchmarkDataSet: + val count = 100000 + val chunk = Array.tabulate[Byte](1024)(_.toByte) + val byteSize = count * 1024 + + def writeTo(out: OutputStream): Unit = + var i = 0 + while i < count do + out.write(chunk) + i += 1 + out.close() + + def writeTo(out: Output): Unit = + var i = 0 + while i < count do + out.write(chunk) + i += 1 + out.close() + + def writeTo(out: ByteBuffer): Unit = + var i = 0 + while i < count do + out.put(chunk) + i += 1 + + def writeTo(out: BufferedOutput): Unit = + var i = 0 + while i < count do + out.write(chunk) + i += 1 + out.close() + + def readFrom(bh: Blackhole, bin: BufferedInput): Unit = { + val a = new Array[Byte](1024) + var i = 0 + while i < count do + bin.bytes(a, 0, a.length) + bh.consume(a) + i += 1 + bin.close() + } + + def readFrom(bh: Blackhole, in: InputStream): Unit = { + val a = new Array[Byte](1024) + var i = 0 + while i < count do + in.read(a) + bh.consume(a) + i += 1 + in.close() + } diff --git a/bench/src/main/scala/perfio/BufferedOutputNumBenchmark.scala b/bench/src/main/scala/perfio/BufferedOutputNumBenchmark.scala index 94031f0..5f8e26d 100644 --- a/bench/src/main/scala/perfio/BufferedOutputNumBenchmark.scala +++ b/bench/src/main/scala/perfio/BufferedOutputNumBenchmark.scala @@ -23,61 +23,22 @@ import java.util.concurrent.TimeUnit @State(Scope.Benchmark) class BufferedOutputNumBenchmark extends BenchUtil: - val count = 20000000 - val byteSize = count * 13 - - // JDK: always big endian - private def writeTo(out: DataOutputStream): Unit = - var i = 0 - while i < count do - out.writeByte(i) - out.writeInt(i+100) - out.writeLong(i+101) - i += 1 - out.close() - - // Kryo: little endian (safe) or native endian (unsafe) - private def writeTo(out: Output): Unit = - var i = 0 - while i < count do - out.writeByte(i) - out.writeInt(i+100) - out.writeLong(i+101) - i += 1 - out.close() - - private def writeTo(out: BufferedOutput): Unit = - var i = 0 - while i < count do - out.int8(i.toByte) - out.int32(i+100) - out.int64(i+101) - i += 1 - out.close() - - private def writeInternalTo(out: BufferedOutput): Unit = - var i = 0 - while i < count do - val p = out.fwd(13) - out.buf(p) = i.toByte - BufferUtil.BA_INT_BIG.set(out.buf, p+1, i+100) - BufferUtil.BA_LONG_BIG.set(out.buf, p+5, (i+101).toLong) - i += 1 - out.close() + @Param(Array("num")) + var dataSet: String = null + final lazy val data = BenchmarkDataSet.forName(dataSet) + import data._ @Benchmark def array_DataOutputStream_growing(bh: Blackhole): Unit = val bout = new MyByteArrayOutputStream - val out = new DataOutputStream(bout) - writeTo(out) + writeTo(bout) bh.consume(bout.getSize) bh.consume(bout.getBuffer) @Benchmark def array_DataOutputStream_preallocated(bh: Blackhole): Unit = - val bout = new MyByteArrayOutputStream(count * 13) - val out = new DataOutputStream(bout) - writeTo(out) + val bout = new MyByteArrayOutputStream(byteSize) + writeTo(bout) bh.consume(bout.getSize) bh.consume(bout.getBuffer) @@ -91,7 +52,7 @@ class BufferedOutputNumBenchmark extends BenchUtil: @Benchmark def array_Kryo_preallocated(bh: Blackhole): Unit = - val out = new Output(count * 13) + val out = new Output(byteSize) writeTo(out) bh.consume(out.position()) @@ -105,8 +66,8 @@ class BufferedOutputNumBenchmark extends BenchUtil: @Benchmark def array_KryoUnsafe_preallocated(bh: Blackhole): Unit = - val bb = ByteBuffer.allocate(count * 13) - val out = new UnsafeOutput(count * 13) + val bb = ByteBuffer.allocate(byteSize) + val out = new UnsafeOutput(byteSize) writeTo(out) bh.consume(out.position()) @@ -120,7 +81,7 @@ class BufferedOutputNumBenchmark extends BenchUtil: @Benchmark def array_KryoBB_preallocated(bh: Blackhole): Unit = - val bb = ByteBuffer.allocate(count * 13) + val bb = ByteBuffer.allocate(byteSize) val out = new ByteBufferOutput(bb) writeTo(out) bh.consume(out.position()) @@ -135,8 +96,8 @@ class BufferedOutputNumBenchmark extends BenchUtil: @Benchmark def array_KryoBBUnsafe_preallocated(bh: Blackhole): Unit = - val bb = ByteBuffer.allocate(count * 13) - val out = new UnsafeByteBufferOutput(count * 13) + val bb = ByteBuffer.allocate(byteSize) + val out = new UnsafeByteBufferOutput(byteSize) writeTo(out) bh.consume(out.position()) @@ -150,20 +111,12 @@ class BufferedOutputNumBenchmark extends BenchUtil: @Benchmark def array_FlushingBufferedOutput_fixed(bh: Blackhole): Unit = - val bout = new MyByteArrayOutputStream(count * 13) + val bout = new MyByteArrayOutputStream(byteSize) val out = BufferedOutput.of(bout) writeTo(out) bh.consume(bout.getSize) bh.consume(bout.getBuffer) - @Benchmark - def array_FlushingBufferedOutput_internal_fixed(bh: Blackhole): Unit = - val bout = new MyByteArrayOutputStream(count * 13) - val out = BufferedOutput.of(bout) - writeInternalTo(out) - bh.consume(bout.getSize) - bh.consume(bout.getBuffer) - @Benchmark def array_FullyBufferedOutput_growing(bh: Blackhole): Unit = val out = BufferedOutput.growing() @@ -173,36 +126,29 @@ class BufferedOutputNumBenchmark extends BenchUtil: @Benchmark def array_FullyBufferedOutput_growing_preallocated(bh: Blackhole): Unit = - val out = BufferedOutput.growing(count*13) + val out = BufferedOutput.growing(byteSize) writeTo(out) bh.consume(out.buffer) bh.consume(out.length) @Benchmark def array_FullyBufferedOutput_fixed(bh: Blackhole): Unit = - val out = BufferedOutput.ofArray(new Array[Byte](count*13)) + val out = BufferedOutput.ofArray(new Array[Byte](byteSize)) writeTo(out) bh.consume(out.buffer) bh.consume(out.length) @Benchmark def array_ByteBuffer(bh: Blackhole): Unit = - val out = ByteBuffer.allocate(count*13) - var i = 0 - while i < count do - out.put(i.toByte) - out.putInt(i+100) - out.putLong(i+101) - i += 1 + val out = ByteBuffer.allocate(byteSize) + writeTo(out) bh.consume(out) @Benchmark def file_DataOutputStream(bh: Blackhole): Unit = val fout = new FileOutputStream("/dev/null") val bout = new BufferedOutputStream(fout) - val out = new DataOutputStream(bout) - writeTo(out) - out.close() + writeTo(bout) @Benchmark def file_FlushingBufferedOutput(bh: Blackhole): Unit = diff --git a/bench/src/main/scala/perfio/OutputToInputBenchmark.scala b/bench/src/main/scala/perfio/OutputToInputBenchmark.scala index 03c1246..d80312b 100644 --- a/bench/src/main/scala/perfio/OutputToInputBenchmark.scala +++ b/bench/src/main/scala/perfio/OutputToInputBenchmark.scala @@ -17,79 +17,62 @@ import java.util.concurrent.TimeUnit @State(Scope.Benchmark) class OutputToInputBenchmark extends BenchUtil: - val count = 20000000 - val byteSize = count * 13 - - private def writeTo(out: DataOutputStream): Unit = - var i = 0 - while i < count do - out.writeByte(i) - out.writeInt(i+100) - out.writeLong(i+101) - i += 1 - out.close() - - private def writeTo(out: BufferedOutput): Unit = - var i = 0 - while i < count do - out.int8(i.toByte) - out.int32(i+100) - out.int64(i+101) - i += 1 - out.close() - - private def readFrom(bh: Blackhole, bin: BufferedInput): Unit = { - var i = 0 - while(i < count) { - bh.consume(bin.int8()) - bh.consume(bin.int32()) - bh.consume(bin.int64()) - i += 1 - } - bin.close() - } - - def readFrom(bh: Blackhole, din: DataInputStream): Unit = { - var i = 0 - while(i < count) { - bh.consume(din.readByte()) - bh.consume(din.readInt()) - bh.consume(din.readLong()) - i += 1 - } - din.close() - } + @Param(Array("num")) + var dataSet: String = null + final lazy val data = BenchmarkDataSet.forName(dataSet) + import data._ @Benchmark - def num_DataOutputStream_growing(bh: Blackhole): Unit = + def jdk_ByteArrayOutputStream_growing(bh: Blackhole): Unit = val bout = new MyByteArrayOutputStream - val out = new DataOutputStream(bout) - writeTo(out) - val in = new DataInputStream(new ByteArrayInputStream(bout.getBuffer, 0, bout.getSize)) + writeTo(bout) + val in = new ByteArrayInputStream(bout.getBuffer, 0, bout.getSize) readFrom(bh, in) @Benchmark - def num_DataOutputStream_preallocated(bh: Blackhole): Unit = - val bout = new MyByteArrayOutputStream(count * 13) - val out = new DataOutputStream(bout) - writeTo(out) - val in = new DataInputStream(new ByteArrayInputStream(bout.getBuffer, 0, bout.getSize)) + def jdk_ByteArrayOutputStream_preallocated(bh: Blackhole): Unit = + val bout = new MyByteArrayOutputStream(byteSize) + writeTo(bout) + val in = new ByteArrayInputStream(bout.getBuffer, 0, bout.getSize) readFrom(bh, in) @Benchmark - def num_FullyBufferedOutput_growing(bh: Blackhole): Unit = + def jdk_PipeOutputStream(bh: Blackhole): Unit = + // This works best. Adding a BufferedOutputStream or BufferedInputStream only makes it slower. + val pout = new PipedOutputStream() + val pin = new PipedInputStream(pout) + val t1 = new Thread(() => writeTo(pout)) + val t2 = new Thread(() => readFrom(bh, pin)) + t1.start() + t2.start() + t1.join() + t2.join() + + @Benchmark + def perfIO_ArrayBufferedOutput_growing(bh: Blackhole): Unit = val out = BufferedOutput.growing() writeTo(out) readFrom(bh, out.toBufferedInput) @Benchmark - def num_FullyBufferedOutput_preallocated(bh: Blackhole): Unit = - val out = BufferedOutput.growing(count * 13) + def perfIO_ArrayBufferedOutput_preallocated(bh: Blackhole): Unit = + val out = BufferedOutput.growing(byteSize) writeTo(out) readFrom(bh, out.toBufferedInput) @Benchmark - def num_BlockBufferedOutput(bh: Blackhole): Unit = + def perfIO_BlockBufferedOutput(bh: Blackhole): Unit = val out = BufferedOutput.ofBlocks() writeTo(out) readFrom(bh, out.toBufferedInput) + + @Benchmark + def perfIO_PipeBufferedOutput(bh: Blackhole): Unit = + val out = BufferedOutput.pipe() + val in = out.toBufferedInput + val t1 = new Thread(() => writeTo(out)) + val t2 = new Thread(() => readFrom(bh, in)) + t1.start() + t2.start() + t1.join() + t2.join() diff --git a/core/src/main/java/perfio/BlockBufferedOutput.java b/core/src/main/java/perfio/BlockBufferedOutput.java index 7437cd1..c2c4b1b 100644 --- a/core/src/main/java/perfio/BlockBufferedOutput.java +++ b/core/src/main/java/perfio/BlockBufferedOutput.java @@ -59,11 +59,7 @@ final class BlockBufferedOutputIterator extends BufferIterator { private BufferedOutput block; private final BufferedOutput root; - BlockBufferedOutputIterator(BufferedOutput root) { - this.root = root; - this.block = skipEmpty(root.next); - if(this.block == null) this.block = root.next; - } + BlockBufferedOutputIterator(BufferedOutput root) { this.root = root; } private BufferedOutput skipEmpty(BufferedOutput b) { while(true) { @@ -76,14 +72,16 @@ private BufferedOutput skipEmpty(BufferedOutput b) { } } - public boolean next() { - if(block == root) return false; - var b = skipEmpty(block.next); + public Object next() { + if(block == root) return null; + var b = skipEmpty(block == null ? root.next : block.next); if(b == null) return false; block = b; - return true; + return b; } + public void returnBuffer(Object id) {} + public byte[] buffer() { return block.buf; } public int start() { return block.start; } public int end() { return block.pos; } diff --git a/core/src/main/java/perfio/BufferIterator.java b/core/src/main/java/perfio/BufferIterator.java index 09fffe1..2188158 100644 --- a/core/src/main/java/perfio/BufferIterator.java +++ b/core/src/main/java/perfio/BufferIterator.java @@ -1,65 +1,67 @@ package perfio; +import java.io.IOException; import java.io.InputStream; import java.util.Objects; -/// An iterator over a list of byte array buffers. It is initially positioned -/// on the first buffer upon creation. Calling [#next()] advances it to the next -/// buffer. An empty iterator consists of a single empty buffer, otherwise all -/// buffers are non-empty. +/// An iterator over a list of byte array buffers. It is initially positioned before the first +/// buffer. Calling [#next()] advances it to the next buffer. All buffers must be non-empty. An +/// empty iterator contains no buffers. abstract class BufferIterator { - /// Advance to the next non-empty buffer and return `true`, or return `false` - /// if the end of the list has been reached. - public abstract boolean next(); + /// Advance to the next non-empty buffer and return a buffer id, or null if the end of the list + /// has been reached. + public abstract Object next() throws IOException; - /// The current buffer + /// The current buffer. + /// The behavior is undefined before successfully retrieving a buffer with [#next()]. public abstract byte[] buffer(); - /// The first used index in the current buffer + /// The first used index in the current buffer. + /// The behavior is undefined before successfully retrieving a buffer with [#next()]. public abstract int start(); - /// The index after the last used index in the current buffer + /// The index after the last used index in the current buffer. + /// The behavior is undefined before successfully retrieving a buffer with [#next()]. public abstract int end(); /// The number of bytes in the current buffer, equivalent to `end() - start()` + /// The behavior is undefined before successfully retrieving a buffer with [#next()]. public int length() { return end() - start(); } + + /// Return the buffer with the given id so it can be reused. + public abstract void returnBuffer(Object id); } final class BufferIteratorInputStream extends InputStream { private final BufferIterator it; private byte[] buf; + private Object bufferId; private int pos, lim; - BufferIteratorInputStream(BufferIterator it) { - this.it = it; - updateBuffer(); - } - - private void updateBuffer() { - buf = it.buffer(); - pos = it.start(); - lim = it.end(); - } + BufferIteratorInputStream(BufferIterator it) { this.it = it; } // Make data available in the current buffer, advancing it if necessary. Returns // false if the end of the data has been reached. - private boolean request() { + private boolean request() throws IOException { if(lim - pos <= 0) { - if(!it.next()) return false; - updateBuffer(); + if(bufferId != null) it.returnBuffer(bufferId); + if((bufferId = it.next()) == null) return false; + buf = it.buffer(); + pos = it.start(); + lim = it.end(); } return true; } - public int read() { + public int read() throws IOException { if(!request()) return -1; return buf[pos++] & 0xFF; } @Override - public int read(byte[] b, int off, int len) { + public int read(byte[] b, int off, int len) throws IOException { Objects.checkFromIndexSize(off, len, b.length); if(len == 0) return 0; if(!request()) return -1; @@ -70,7 +72,7 @@ public int read(byte[] b, int off, int len) { } @Override - public long skip(long n) { + public long skip(long n) throws IOException { long rem = n; while(true) { if(rem <= available()) { diff --git a/core/src/main/java/perfio/BufferedOutput.java b/core/src/main/java/perfio/BufferedOutput.java index cd61118..137a50c 100644 --- a/core/src/main/java/perfio/BufferedOutput.java +++ b/core/src/main/java/perfio/BufferedOutput.java @@ -70,6 +70,25 @@ public static AccumulatingBufferedOutput ofBlocks(int initialBufferSize) { /// @see #ofBlocks(int) public static AccumulatingBufferedOutput ofBlocks() { return ofBlocks(DefaultBufferSize); } + /// Write data to a buffered pipe that can be read concurrently from another thread as an + /// [InputStream] or [BufferedInput]. Blocks of data are buffered and then transferred to the + /// reader. Note that this BufferedOutput and the opposing BufferedInput or InputStream are not + /// thread-safe, just like any other BufferedOutput/BufferedInput. Only their communication is. + /// This allows you to use one thread for writing and one thread for reading. Doing both from + /// the same thread is not recommended as it may deadlock when the internal buffer runs full or + /// empty. + /// + /// @param initialBufferSize Initial buffer size. The buffer is expanded later if necessary. + public static PipeBufferedOutput pipe(int initialBufferSize) { + return new PipeBufferedOutput(true, initialBufferSize, 2); + } + + /// Write data to a buffered pipe that can be read concurrently from another thread as an + /// [InputStream] or [BufferedInput] using the default initial buffer size. + /// @see #pipe(int) + public static PipeBufferedOutput pipe() { return pipe(DefaultBufferSize); } + + /// Write data to a given region of an existing byte array. The BufferedOutput is limited to the /// initial size. /// @@ -768,7 +787,7 @@ void closeUpstream() throws IOException { } -sealed abstract class CacheRootBufferedOutput extends BufferedOutput permits FlushingBufferedOutput, AccumulatingBufferedOutput { +sealed abstract class CacheRootBufferedOutput extends BufferedOutput permits FlushingBufferedOutput, AccumulatingBufferedOutput, PipeBufferedOutput { final int initialBufferSize; CacheRootBufferedOutput(byte[] buf, boolean bigEndian, int start, int pos, int lim, int initialBufferSize, boolean fixed, long totalLimit) { @@ -797,9 +816,11 @@ void returnToCache(BufferedOutput b) { /// Get a cached or new exclusive block. NestedBufferedOutput getExclusiveBlock() { - if(cachedExclusive == null) + if(cachedExclusive == null) { + //System.out.println("New exclusive block"); return new NestedBufferedOutput(new byte[cacheRoot.initialBufferSize], false, cacheRoot); - else { + } else { + //System.out.println("Cached exclusive block"); var b = cachedExclusive; cachedExclusive = b.next; return (NestedBufferedOutput)b; @@ -808,9 +829,11 @@ NestedBufferedOutput getExclusiveBlock() { /// Get a cached or new shared block. NestedBufferedOutput getSharedBlock() { - if(cachedShared == null) + if(cachedShared == null) { + //System.out.println("New shared block"); return new NestedBufferedOutput(null, true, cacheRoot); - else { + } else { + //System.out.println("Cached shared block"); var b = cachedShared; cachedShared = b.next; return (NestedBufferedOutput)b; diff --git a/core/src/main/java/perfio/PipeBufferedOutput.java b/core/src/main/java/perfio/PipeBufferedOutput.java new file mode 100644 index 0000000..a4ec2a0 --- /dev/null +++ b/core/src/main/java/perfio/PipeBufferedOutput.java @@ -0,0 +1,140 @@ +package perfio; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteOrder; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/// A [BufferedOutput] which makes its data available for concurrent reading from another thread +/// via [#toBufferedInput()] or [#toInputStream()]. +public final class PipeBufferedOutput extends CacheRootBufferedOutput { + private AtomicBoolean connected = new AtomicBoolean(false); + private final BlockingQueue queue; + private final BlockingQueue returnQueue = new LinkedBlockingQueue<>(); + + PipeBufferedOutput(boolean bigEndian, int initialBufferSize, int blockDepth) { + super(new byte[initialBufferSize], bigEndian, 0, 0, initialBufferSize, initialBufferSize, false, Long.MAX_VALUE); + queue = new ArrayBlockingQueue<>(blockDepth); + } + + @Override + public PipeBufferedOutput order(ByteOrder order) { + super.order(order); + return this; + } + + void flushBlocks(boolean forceFlush) throws IOException { + //TODO split buffer to forceFlush + while(next != this) { + var b = next; + if(!b.closed) return; + var blen = b.pos - b.start; + if(b.sharing == SHARING_LEFT) { + var n = b.next; + n.start = b.start; + n.totalFlushed -= blen; + b.unlinkAndReturn(); + } else if(blen != 0) { + b.unlinkOnly(); + put(b); + } else b.unlinkAndReturn(); + } + if(closed) put(this); + } + + private void put(BufferedOutput b) throws IOException { + BufferedOutput r; + while((r = returnQueue.poll()) != null) + if(r != this) cacheRoot.returnToCache(r); + try { queue.put(b); } + catch(InterruptedException ex) { throw new IOException("Pipe transfer interrupted", ex); } + } + + @Override + void flushAndGrow(int count) throws IOException { + // switch to a new buffer if this one is sufficiently filled + if(lim-pos <= initialBufferSize/2) { + var pre = this.prev; + checkState(); + var b = cacheRoot.getExclusiveBlock(); + totalFlushed += (pos-start); + buf = b.reinit(buf, bigEndian, start, pos, lim, sharing, 0L, 0L, true, root, null); + b.closed = true; + b.insertBefore(this); + start = 0; + pos = 0; + lim = buf.length; + if(pre == root) root.flushBlocks(false); + } else super.flushAndGrow(count); + } + + @Override + void closeUpstream() throws IOException { + super.closeUpstream(); + put(QueuedBufferIterator.END_MARKER); + } + + void flushUpstream() {} + + /// Create a new [InputStream] that reads the data as it is written. This method is + /// thread-safe. Only one call to [#toInputStream()] or [#toBufferedInput()] is allowed. + public InputStream toInputStream() throws IOException { + return new BufferIteratorInputStream(bufferIterator()); + } + + /// Create a new [BufferedInput] that reads the data as it is written. This method is + /// thread-safe. Only one call to [#toInputStream()] or [#toBufferedInput()] is allowed. + public BufferedInput toBufferedInput() throws IOException { + return new SwitchingHeapBufferedInput(bufferIterator(), bigEndian); + } + + BufferIterator bufferIterator() throws IOException { + if(!connected.compareAndSet(false, true)) throw new IOException("Pipe is already connected"); + return new QueuedBufferIterator(queue, returnQueue); + } +} + + +final class QueuedBufferIterator extends BufferIterator { + public static final BufferedOutput END_MARKER = BufferedOutput.growing(0); + + static { try { END_MARKER.close(); } catch (IOException ignored) {} } + + private final BlockingQueue queue, returnQueue; + private byte[] buffer; + private int start, end; + private boolean finished; + + QueuedBufferIterator(BlockingQueue queue, BlockingQueue returnQueue) { + this.queue = queue; + this.returnQueue = returnQueue; + } + + public byte[] buffer() { return buffer; } + public int start() { return start; } + public int end() { return end; } + + public Object next() throws IOException { + if(finished) return null; + BufferedOutput b; + try { b = queue.take(); } + catch(InterruptedException ex) { throw new IOException("Pipe transfer interrupted", ex); } + //System.out.println("Took "+(b.pos-b.start)); + if(b == END_MARKER) { + finished = true; + buffer = null; + return null; + } + buffer = b.buf; + start = b.start; + end = b.pos; + return b; + } + + public void returnBuffer(Object id) { + if(!finished) returnQueue.offer((BufferedOutput)id); + } +} diff --git a/core/src/main/java/perfio/SwitchingHeapBufferedInput.java b/core/src/main/java/perfio/SwitchingHeapBufferedInput.java index d636589..db5c1dd 100644 --- a/core/src/main/java/perfio/SwitchingHeapBufferedInput.java +++ b/core/src/main/java/perfio/SwitchingHeapBufferedInput.java @@ -6,9 +6,10 @@ final class SwitchingHeapBufferedInput extends HeapBufferedInput { private final BufferIterator it; private int seamOverlap = 0; + private Object bufferId1, bufferId2; SwitchingHeapBufferedInput(BufferIterator it, boolean bigEndian) { - super(it.buffer(), it.start(), it.end(), Long.MAX_VALUE, null, null, bigEndian); + super(new byte[0], 0, 0, Long.MAX_VALUE, null, null, bigEndian); this.it = it; } @@ -35,19 +36,29 @@ void prepareAndFillBuffer(int count) throws IOException { if(totalBuffered < totalReadLimit) { while(available() < count) { if(pos == lim) { - if(seamOverlap != 0) { + if(seamOverlap == 0) { + //assert(bufferId2 == null); + if(bufferId1 != null) it.returnBuffer(bufferId1); + if((bufferId1 = it.next()) == null) break; updateBuffer(); - pos += seamOverlap; - seamOverlap = 0; } else { - if(!it.next()) break; + //assert(bufferId1 != null); + //assert(bufferId2 != null); + it.returnBuffer(bufferId1); + bufferId1 = bufferId2; + bufferId2 = null; updateBuffer(); + pos += seamOverlap; + seamOverlap = 0; } } else { if(pos + count > buf.length) shiftOrGrow(count); if(seamOverlap == 0) { - if(!it.next()) break; + //assert(bufferId1 != null); + //assert(bufferId2 == null); + it.returnBuffer(bufferId1); + if((bufferId1 = it.next()) == null) break; var rem = count - available(); var nlen = it.length(); if(rem < nlen) { // at least 1 byte will remain in `next` -> create a seam @@ -62,8 +73,13 @@ void prepareAndFillBuffer(int count) throws IOException { totalBuffered += nlen; } } else { // existing seam + //assert(bufferId1 != null); + //assert(bufferId2 != null); if(lim - pos <= seamOverlap) { // pos is in the seam -> switch to next buffer var a = lim - pos; + it.returnBuffer(bufferId1); + bufferId1 = bufferId2; + bufferId2 = null; updateBuffer(); pos -= a; pos += seamOverlap; @@ -82,6 +98,8 @@ void prepareAndFillBuffer(int count) throws IOException { lim += nlen; totalBuffered += nlen; seamOverlap = 0; + it.returnBuffer(bufferId2); + bufferId2 = null; } } } diff --git a/test/src/test/scala/perfio/BufferedOutputTest.scala b/test/src/test/scala/perfio/BufferedOutputTest.scala index 58d5add..79c0dac 100644 --- a/test/src/test/scala/perfio/BufferedOutputTest.scala +++ b/test/src/test/scala/perfio/BufferedOutputTest.scala @@ -160,4 +160,5 @@ object BufferedOutputTest: Array[Any]("growing_64", (_: TestData).createGrowingBufferedOutput(64)), Array[Any]("fixed_32768", (_: TestData).createFixedBufferedOutput(new Array[Byte](32768))), Array[Any]("block_64", (_: TestData).createBlockBufferedOutput(64)), + Array[Any]("pipe_64", (_: TestData).createPipeBufferedOutput(64)), ) diff --git a/test/src/test/scala/perfio/TestUtil.scala b/test/src/test/scala/perfio/TestUtil.scala index 03c99bd..71dee2c 100644 --- a/test/src/test/scala/perfio/TestUtil.scala +++ b/test/src/test/scala/perfio/TestUtil.scala @@ -5,6 +5,8 @@ import org.junit.Assert import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, FileOutputStream, InputStream} import java.lang.foreign.{Arena, MemorySegment, ValueLayout} import java.nio.{ByteBuffer, ByteOrder} +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} import scala.reflect.ClassTag trait TestUtil: @@ -81,6 +83,14 @@ class TestData(val bytes: Array[Byte], val name: String, owner: Class[?]): Assert.assertArrayEquals(bytes, in.readAllBytes()) (bo, checker) + def createPipeBufferedOutput(initialBufferSize: Int = 64): (BufferedOutput, () => Unit) = + val bo = BufferedOutput.pipe(initialBufferSize) + val res = Future { bo.toInputStream.readAllBytes() }(ExecutionContext.global) + val checker = () => + bo.close() + Assert.assertArrayEquals(bytes, Await.result(res, Duration.Inf)) + (bo, checker) + def createFixedBufferedOutput(buf: Array[Byte], start: Int = 0, len: Int = -1): (BufferedOutput, () => Unit) = val bo = BufferedOutput.ofArray(buf, start, if(len == -1) buf.length-start else len) val checker = () =>