Skip to content

Commit

Permalink
Dynamically size the read buffer based on contention
Browse files Browse the repository at this point in the history
Previously the read buffer consisted of a fixed number of segments, each
consisting of a bounded ring buffer. This provided performance under heavy
load, at the cost of high memory when the cache was created. For caches that
are not heavily contended this is wasteful, especially when many are constructed.

This fixed cost is replaced with a dynamic approach, which starts at a single
buffer and expands as needed. The upper limit results in the same number of
read buffers, but only under high load. This dramatically reduces memory usage.

The approach is based on j.u.c.Striped64, which provides the mechanism for
Java's high performance 64-bit atomic counters.
  • Loading branch information
ben-manes committed Mar 30, 2015
1 parent bb846eb commit a419a41
Show file tree
Hide file tree
Showing 20 changed files with 623 additions and 210 deletions.
6 changes: 0 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ subprojects { proj ->

sourceCompatibility = JavaVersion.VERSION_1_8

tasks.withType(JavaCompile) {
if (!System.env.'CI') {
options.incremental = !rootProject.hasProperty('release')
}
}

group = 'com.github.ben-manes.caffeine'
version.with {
major = 1 // incompatible API changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;

import com.github.benmanes.caffeine.cache.buffer.Buffer;
import com.github.benmanes.caffeine.cache.buffer.ReadBuffer;
import com.github.benmanes.caffeine.cache.buffer.BufferType;

/**
Expand All @@ -46,7 +46,7 @@
public class ReadBufferBenchmark {

@Param BufferType bufferType;
Buffer buffer;
ReadBuffer buffer;

@Setup
public void setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,191 +20,110 @@
import java.util.function.Consumer;

/**
* A multiple-producer / single-consumer bounded buffer that rejects new elements if it is full or
* fails spuriously due to contention. Unlike a queue and stack, a buffer does not guarantee an
* ordering of elements in either FIFO or LIFO order.
* <p>
* Beware that it is the responsibility of the caller to ensure that a consumer has exclusive read
* access to the buffer. This implementation does <em>not</em> include fail-fast behavior to guard
* against incorrect consumer usage.
* A striped, non-blocking, bounded buffer.
*
* @param <E> the type of elements maintained by this buffer
* @author [email protected] (Ben Manes)
*/
final class BoundedBuffer<E> {

final class BoundedBuffer<E> extends StripedBuffer<E> {
/*
* A segmented, non-blocking, circular ring buffer is used to store the elements being transfered
* by the producers to the consumer. The monotonically increasing count of reads and writes allow
* indexing sequentially to the next element location. The arrays use power-of-two sizing for
* quickly determining to the proper location.
* A circular ring buffer stores the elements being transfered by the producers to the consumer.
* The monotonically increasing count of reads and writes allow indexing sequentially to the next
* element location based upon a power-of-two sizing.
*
* The producers race to read the counts, check if there is available capacity, and if so then try
* once to CAS to the next write count. If the increment is successful then the producer lazily
* publishes the next element. The producer does not retry or block when unsuccessful due to a
* failed CAS or the buffer being full.
* publishes the element. The producer does not retry or block when unsuccessful due to a failed
* CAS or the buffer being full.
*
* The consumer reads the counts and takes the available elements. The clearing of the elements
* and the next read count are lazily set.
*
* To further increase concurrency the buffer is internally segmented into multiple ring buffers.
* The number of segments is determined as a size that minimize contention that may cause spurious
* failures for producers. The segment is chosen by a hash of the thread's id.
* This implementation is striped to further increase concurrency by rehashing and dynamically
* adding new buffers when contention is detected, up to an internal maximum. When rehashing in
* order to discover an available buffer, the producer may retry adding its element to determine
* whether it found a satisfactory buffer or if resizing is necessary.
*/

/** The number of CPUs */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/** The number of read buffers to use. */
static final int NUMBER_OF_SEGMENTS = 4 * ceilingNextPowerOfTwo(NCPU);

/** Mask value for indexing into the read buffers. */
static final int SEGMENT_MASK = NUMBER_OF_SEGMENTS - 1;

/** The maximum number of pending reads per buffer. */
static final int RING_BUFFER_SIZE = 32;
static final int BUFFER_SIZE = 32;

/** Mask value for indexing into the read buffer. */
static final int RING_BUFFER_MASK = RING_BUFFER_SIZE - 1;
static final int BUFFER_MASK = BUFFER_SIZE - 1;

static int ceilingNextPowerOfTwo(int x) {
// From Hacker's Delight, Chapter 3, Harry S. Warren Jr.
return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(x - 1));
@Override
protected Buffer<E> create(E e) {
return new RingBuffer<>(e);
}

final AtomicLong[] readCount;
final AtomicLong[] writeCount;
final AtomicReference<E>[][] table;

@SuppressWarnings({"unchecked", "cast", "rawtypes"})
public BoundedBuffer() {
readCount = new AtomicLong[NUMBER_OF_SEGMENTS];
writeCount = new AtomicLong[NUMBER_OF_SEGMENTS];
table = new AtomicReference[NUMBER_OF_SEGMENTS][RING_BUFFER_SIZE];
for (int i = 0; i < NUMBER_OF_SEGMENTS; i++) {
table[i] = new AtomicReference[RING_BUFFER_SIZE];
for (int j = 0; j < RING_BUFFER_SIZE; j++) {
table[i][j] = new AtomicReference<>();
static final class RingBuffer<E> implements Buffer<E> {
final AtomicLong readCounter;
final AtomicLong writeCounter;
final AtomicReference<E>[] buffer;

@SuppressWarnings({"unchecked", "cast", "rawtypes"})
public RingBuffer(E e) {
readCounter = new AtomicLong();
writeCounter = new AtomicLong(1);
buffer = new AtomicReference[BUFFER_SIZE];
for (int i = 0; i < BUFFER_SIZE; i++) {
buffer[i] = new AtomicReference<>();
}
readCount[i] = new AtomicLong();
writeCount[i] = new AtomicLong();
buffer[0].lazySet(e);
}
}

/**
* Inserts the specified element into this buffer if it is possible to do so immediately without
* violating capacity restrictions. The addition is allowed to fail spuriously if multiple
* threads insert concurrently.
*
* @param e the element to add
* @return {@code true} if the element was or could have been added; {@code false} if full
*/
public boolean submit(E e) {
final int segmentIndex = segmentIndex();
final AtomicLong readCounter = readCount[segmentIndex];
final AtomicLong writeCounter = writeCount[segmentIndex];

long head = readCounter.get();
long tail = writeCounter.get();
long size = (tail - head);
if (size >= RING_BUFFER_SIZE) {
return false;
}
if (writeCounter.compareAndSet(tail, tail + 1)) {
int index = (int) (tail & RING_BUFFER_MASK);
table[segmentIndex][index].lazySet(e);
@Override
public int offer(E e) {
long head = readCounter.get();
long tail = writeCounter.get();
long size = (tail - head);
if (size >= BUFFER_SIZE) {
return Buffer.FULL;
}
if (writeCounter.compareAndSet(tail, tail + 1)) {
int index = (int) (tail & BUFFER_MASK);
buffer[index].lazySet(e);
return Buffer.SUCCESS;
}
return Buffer.FAILED;
}
return true;
}

/**
* Drains the buffer, sending each element to the consumer for processing. The caller must ensure
* that a consumer has exclusive read access to the buffer.
*
* @param consumer the action to perform on each element
*/
public void drain(Consumer<E> consumer) {
final int start = segmentIndex();
final int end = start + NUMBER_OF_SEGMENTS;
for (int i = start; i < end; i++) {
drainSegment(consumer, i & SEGMENT_MASK);
@Override
public void drain(Consumer<E> consumer) {
long head = readCounter.get();
long tail = writeCounter.get();
long size = (tail - head);
if (size == 0) {
return;
}
do {
int index = (int) (head & BUFFER_MASK);
AtomicReference<E> slot = buffer[index];
E e = slot.get();
if (e == null) {
// not published yet
break;
}
slot.lazySet(null);
consumer.accept(e);
head++;
} while (head != tail);
readCounter.lazySet(head);
}
}

/**
* Drains an segment.
*
* @param consumer the action to perform on each element
* @param segmentIndex the segment index in the table
*/
private void drainSegment(Consumer<E> consumer, int segmentIndex) {
final AtomicLong readCounter = readCount[segmentIndex];
final AtomicLong writeCounter = writeCount[segmentIndex];

long head = readCounter.get();
long tail = writeCounter.get();
long size = (tail - head);
if (size == 0) {
return;
@Override
public int size() {
return writes() - reads();
}
do {
int index = (int) (head & RING_BUFFER_MASK);
AtomicReference<E> slot = table[segmentIndex][index];
E e = slot.get();
if (e == null) {
// not published yet
break;
}
slot.lazySet(null);
consumer.accept(e);
head++;
} while (head != tail);
readCounter.lazySet(head);
}

/**
* Returns the number of elements residing in the buffer.
*
* @return the number of elements in this buffer
*/
public int size() {
return writes() - reads();
}

/**
* Returns the number of elements that have been written to the buffer.
*
* @return the number of elements written to this buffer
*/
public int writes() {
int writes = 0;
for (AtomicLong counter : writeCount) {
writes += counter.intValue();
@Override
public int reads() {
return readCounter.intValue();
}
return writes;
}

/**
* Returns the number of elements that have been read from the buffer.
*
* @return the number of elements read from this buffer
*/
public int reads() {
int reads = 0;
for (AtomicLong counter : readCount) {
reads += counter.intValue();
@Override
public int writes() {
return writeCounter.intValue();
}
return reads;
}

/**
* Returns the index to the ring buffer to record into. Uses a one-step FNV-1a hash code
* (http://www.isthe.com/chongo/tech/comp/fnv) based on the current thread's id. These hash codes
* have more uniform distribution properties with respect to small moduli (here 1-31) than do
* other simple hashing functions.
*/
static int segmentIndex() {
int id = (int) Thread.currentThread().getId();
return ((id ^ 0x811c9dc5) * 0x01000193) & SEGMENT_MASK;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ abstract class BoundedLocalCache<K, V> extends AbstractMap<K, V> implements Loca

// The policy management
final AtomicReference<DrainStatus> drainStatus;
final BoundedBuffer<Node<K, V>> readBuffer;
final Buffer<Node<K, V>> readBuffer;
final NonReentrantLock evictionLock;
final Weigher<K, V> weigher;
final boolean isAsync;
Expand All @@ -143,7 +143,6 @@ abstract class BoundedLocalCache<K, V> extends AbstractMap<K, V> implements Loca
protected BoundedLocalCache(Caffeine<K, V> builder,
@Nullable CacheLoader<? super K, V> loader, boolean isAsync) {
this.isAsync = isAsync;
readBuffer = new BoundedBuffer<>();
weigher = builder.getWeigher(isAsync);
evictionLock = new NonReentrantLock();
id = tracer().register(builder.name());
Expand All @@ -153,6 +152,9 @@ protected BoundedLocalCache(Caffeine<K, V> builder,
builder.isStrongValues(), builder.isWeakValues(), builder.isSoftValues(),
builder.expiresAfterAccess(), builder.expiresAfterWrite(), builder.refreshes(),
builder.evicts(), (isAsync && builder.evicts()) || builder.isWeighted());
readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess()
? new BoundedBuffer<>()
: Buffer.disabled();
}

/** Returns if the node's value is currently being computed, asynchronously. */
Expand Down Expand Up @@ -469,23 +471,28 @@ void afterRead(Node<K, V> node, boolean recordHit) {
long now = ticker().read();
node.setAccessTime(now);

boolean delayable = readBuffer.submit(node);
boolean delayable = (readBuffer.offer(node) != Buffer.FULL);
drainOnReadIfNeeded(delayable);
refreshIfNeeded(node, now);
}

if (refreshAfterWrite()) {
long writeTime = node.getWriteTime();
if (((now - writeTime) > refreshAfterWriteNanos()) && node.casWriteTime(writeTime, now)) {
executor().execute(() -> {
K key = node.getKey();
if ((key != null) && node.isAlive()) {
try {
computeIfPresent(key, cacheLoader()::reload);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during reload", t);
}
/** Asynchronously refreshes the entry if eligible. */
void refreshIfNeeded(Node<K, V> node, long now) {
if (!refreshAfterWrite()) {
return;
}
long writeTime = node.getWriteTime();
if (((now - writeTime) > refreshAfterWriteNanos()) && node.casWriteTime(writeTime, now)) {
executor().execute(() -> {
K key = node.getKey();
if ((key != null) && node.isAlive()) {
try {
computeIfPresent(key, cacheLoader()::reload);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during reload", t);
}
});
}
}
});
}
}

Expand Down
Loading

0 comments on commit a419a41

Please sign in to comment.