Skip to content

Commit

Permalink
PipeBufferedOutput
Browse files Browse the repository at this point in the history
  • Loading branch information
szeiger committed Nov 26, 2024
1 parent f405a59 commit d58be4d
Show file tree
Hide file tree
Showing 11 changed files with 481 additions and 174 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ perfIO provides buffered streaming I/O abstractions for both binary and text dat
| BufferedInput | BufferedInputStream, ByteArrayInputStream, DataInputStream, LimitedInputStream* |
| BufferedOutput | BufferedOutputStream, DataOutputStream |
| AccumulatingBufferedOutput | - |
| PipeBufferedOutput | PipeOutputStream |
| ArrayBufferedOutput | ByteArrayOutputStream |
| LineTokenizer | BufferedReader + InputStreamReader |
| TextOutput | PrintWriter + BufferedWriter + OutputStreamWriter |
Expand Down
185 changes: 185 additions & 0 deletions bench/src/main/scala/perfio/BenchmarkDataSet.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package perfio

import com.esotericsoftware.kryo.io.Output
import org.openjdk.jmh.infra.Blackhole

import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStream}
import java.nio.ByteBuffer

abstract class BenchmarkDataSet:
def byteSize: Int
def writeTo(out: OutputStream): Unit
def writeTo(out: BufferedOutput): Unit
def writeTo(out: Output): Unit // Kryo: little endian (safe) or native endian (unsafe)
def writeTo(out: ByteBuffer): Unit
def readFrom(bh: Blackhole, bin: BufferedInput): Unit
def readFrom(bh: Blackhole, in: InputStream): Unit

object BenchmarkDataSet:
def forName(s: String): BenchmarkDataSet = s match
case "num" => NumDataSet
case "bytes" => BytesDataSet
case "chunks" => ChunksDataSet


object NumDataSet extends BenchmarkDataSet:
val count = 20000000
val byteSize = count * 13

def writeTo(out: OutputStream): Unit =
val dout = new DataOutputStream(out)
var i = 0
while i < count do
dout.writeByte(i)
dout.writeInt(i+100)
dout.writeLong(i+101)
i += 1
dout.close()

def writeTo(out: Output): Unit =
var i = 0
while i < count do
out.writeByte(i)
out.writeInt(i+100)
out.writeLong(i+101)
i += 1
out.close()

def writeTo(out: ByteBuffer): Unit =
var i = 0
while i < count do
out.put(i.toByte)
out.putInt(i+100)
out.putLong(i+101)
i += 1

def writeTo(out: BufferedOutput): Unit =
var i = 0
while i < count do
out.int8(i.toByte)
out.int32b(i+100)
out.int64b(i+101)
i += 1
out.close()

def readFrom(bh: Blackhole, bin: BufferedInput): Unit = {
var i = 0
while i < count do
bh.consume(bin.int8())
bh.consume(bin.int32b())
bh.consume(bin.int64b())
i += 1
bin.close()
}

def readFrom(bh: Blackhole, in: InputStream): Unit = {
val din = new DataInputStream(in)
var i = 0
while i < count do
bh.consume(din.readByte())
bh.consume(din.readInt())
bh.consume(din.readLong())
i += 1
din.close()
}


object BytesDataSet extends BenchmarkDataSet:
val count = 100000000
val byteSize = count

def writeTo(out: OutputStream): Unit =
var i = 0
while i < count do
out.write(i.toByte)
i += 1
out.close()

def writeTo(out: Output): Unit =
var i = 0
while i < count do
out.writeByte(i.toByte)
i += 1
out.close()

def writeTo(out: ByteBuffer): Unit =
var i = 0
while i < count do
out.put(i.toByte)
i += 1

def writeTo(out: BufferedOutput): Unit =
var i = 0
while i < count do
out.int8(i.toByte)
i += 1
out.close()

def readFrom(bh: Blackhole, bin: BufferedInput): Unit = {
var i = 0
while i < count do
bh.consume(bin.int8())
i += 1
bin.close()
}

def readFrom(bh: Blackhole, in: InputStream): Unit = {
var i = 0
while i < count do
bh.consume(in.read().toByte)
i += 1
in.close()
}


object ChunksDataSet extends BenchmarkDataSet:
val count = 100000
val chunk = Array.tabulate[Byte](1024)(_.toByte)
val byteSize = count * 1024

def writeTo(out: OutputStream): Unit =
var i = 0
while i < count do
out.write(chunk)
i += 1
out.close()

def writeTo(out: Output): Unit =
var i = 0
while i < count do
out.write(chunk)
i += 1
out.close()

def writeTo(out: ByteBuffer): Unit =
var i = 0
while i < count do
out.put(chunk)
i += 1

def writeTo(out: BufferedOutput): Unit =
var i = 0
while i < count do
out.write(chunk)
i += 1
out.close()

def readFrom(bh: Blackhole, bin: BufferedInput): Unit = {
val a = new Array[Byte](1024)
var i = 0
while i < count do
bin.bytes(a, 0, a.length)
bh.consume(a)
i += 1
bin.close()
}

def readFrom(bh: Blackhole, in: InputStream): Unit = {
val a = new Array[Byte](1024)
var i = 0
while i < count do
in.read(a)
bh.consume(a)
i += 1
in.close()
}
92 changes: 19 additions & 73 deletions bench/src/main/scala/perfio/BufferedOutputNumBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,61 +23,22 @@ import java.util.concurrent.TimeUnit
@State(Scope.Benchmark)
class BufferedOutputNumBenchmark extends BenchUtil:

val count = 20000000
val byteSize = count * 13

// JDK: always big endian
private def writeTo(out: DataOutputStream): Unit =
var i = 0
while i < count do
out.writeByte(i)
out.writeInt(i+100)
out.writeLong(i+101)
i += 1
out.close()

// Kryo: little endian (safe) or native endian (unsafe)
private def writeTo(out: Output): Unit =
var i = 0
while i < count do
out.writeByte(i)
out.writeInt(i+100)
out.writeLong(i+101)
i += 1
out.close()

private def writeTo(out: BufferedOutput): Unit =
var i = 0
while i < count do
out.int8(i.toByte)
out.int32(i+100)
out.int64(i+101)
i += 1
out.close()

private def writeInternalTo(out: BufferedOutput): Unit =
var i = 0
while i < count do
val p = out.fwd(13)
out.buf(p) = i.toByte
BufferUtil.BA_INT_BIG.set(out.buf, p+1, i+100)
BufferUtil.BA_LONG_BIG.set(out.buf, p+5, (i+101).toLong)
i += 1
out.close()
@Param(Array("num"))
var dataSet: String = null
final lazy val data = BenchmarkDataSet.forName(dataSet)
import data._

@Benchmark
def array_DataOutputStream_growing(bh: Blackhole): Unit =
val bout = new MyByteArrayOutputStream
val out = new DataOutputStream(bout)
writeTo(out)
writeTo(bout)
bh.consume(bout.getSize)
bh.consume(bout.getBuffer)

@Benchmark
def array_DataOutputStream_preallocated(bh: Blackhole): Unit =
val bout = new MyByteArrayOutputStream(count * 13)
val out = new DataOutputStream(bout)
writeTo(out)
val bout = new MyByteArrayOutputStream(byteSize)
writeTo(bout)
bh.consume(bout.getSize)
bh.consume(bout.getBuffer)

Expand All @@ -91,7 +52,7 @@ class BufferedOutputNumBenchmark extends BenchUtil:

@Benchmark
def array_Kryo_preallocated(bh: Blackhole): Unit =
val out = new Output(count * 13)
val out = new Output(byteSize)
writeTo(out)
bh.consume(out.position())

Expand All @@ -105,8 +66,8 @@ class BufferedOutputNumBenchmark extends BenchUtil:

@Benchmark
def array_KryoUnsafe_preallocated(bh: Blackhole): Unit =
val bb = ByteBuffer.allocate(count * 13)
val out = new UnsafeOutput(count * 13)
val bb = ByteBuffer.allocate(byteSize)
val out = new UnsafeOutput(byteSize)
writeTo(out)
bh.consume(out.position())

Expand All @@ -120,7 +81,7 @@ class BufferedOutputNumBenchmark extends BenchUtil:

@Benchmark
def array_KryoBB_preallocated(bh: Blackhole): Unit =
val bb = ByteBuffer.allocate(count * 13)
val bb = ByteBuffer.allocate(byteSize)
val out = new ByteBufferOutput(bb)
writeTo(out)
bh.consume(out.position())
Expand All @@ -135,8 +96,8 @@ class BufferedOutputNumBenchmark extends BenchUtil:

@Benchmark
def array_KryoBBUnsafe_preallocated(bh: Blackhole): Unit =
val bb = ByteBuffer.allocate(count * 13)
val out = new UnsafeByteBufferOutput(count * 13)
val bb = ByteBuffer.allocate(byteSize)
val out = new UnsafeByteBufferOutput(byteSize)
writeTo(out)
bh.consume(out.position())

Expand All @@ -150,20 +111,12 @@ class BufferedOutputNumBenchmark extends BenchUtil:

@Benchmark
def array_FlushingBufferedOutput_fixed(bh: Blackhole): Unit =
val bout = new MyByteArrayOutputStream(count * 13)
val bout = new MyByteArrayOutputStream(byteSize)
val out = BufferedOutput.of(bout)
writeTo(out)
bh.consume(bout.getSize)
bh.consume(bout.getBuffer)

@Benchmark
def array_FlushingBufferedOutput_internal_fixed(bh: Blackhole): Unit =
val bout = new MyByteArrayOutputStream(count * 13)
val out = BufferedOutput.of(bout)
writeInternalTo(out)
bh.consume(bout.getSize)
bh.consume(bout.getBuffer)

@Benchmark
def array_FullyBufferedOutput_growing(bh: Blackhole): Unit =
val out = BufferedOutput.growing()
Expand All @@ -173,36 +126,29 @@ class BufferedOutputNumBenchmark extends BenchUtil:

@Benchmark
def array_FullyBufferedOutput_growing_preallocated(bh: Blackhole): Unit =
val out = BufferedOutput.growing(count*13)
val out = BufferedOutput.growing(byteSize)
writeTo(out)
bh.consume(out.buffer)
bh.consume(out.length)

@Benchmark
def array_FullyBufferedOutput_fixed(bh: Blackhole): Unit =
val out = BufferedOutput.ofArray(new Array[Byte](count*13))
val out = BufferedOutput.ofArray(new Array[Byte](byteSize))
writeTo(out)
bh.consume(out.buffer)
bh.consume(out.length)

@Benchmark
def array_ByteBuffer(bh: Blackhole): Unit =
val out = ByteBuffer.allocate(count*13)
var i = 0
while i < count do
out.put(i.toByte)
out.putInt(i+100)
out.putLong(i+101)
i += 1
val out = ByteBuffer.allocate(byteSize)
writeTo(out)
bh.consume(out)

@Benchmark
def file_DataOutputStream(bh: Blackhole): Unit =
val fout = new FileOutputStream("/dev/null")
val bout = new BufferedOutputStream(fout)
val out = new DataOutputStream(bout)
writeTo(out)
out.close()
writeTo(bout)

@Benchmark
def file_FlushingBufferedOutput(bh: Blackhole): Unit =
Expand Down
Loading

0 comments on commit d58be4d

Please sign in to comment.