Skip to content

Commit

Permalink
Implement sharing-retaining JDK serialization for BufferRecyclerPools.
Browse files Browse the repository at this point in the history
  • Loading branch information
cowtowncoder committed Aug 31, 2023
1 parent 58e6f30 commit eb2461b
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 54 deletions.
133 changes: 88 additions & 45 deletions src/main/java/com/fasterxml/jackson/core/util/BufferRecyclerPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.io.Serializable;
import java.util.Deque;
import java.util.Queue;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -122,7 +122,7 @@ private ThreadLocalPool() { }

protected Object readResolve() { return GLOBAL; }

// // // JDK serialization support
// // // Actual API implementation

@SuppressWarnings("deprecation")
@Override
Expand Down Expand Up @@ -164,7 +164,7 @@ public static BufferRecyclerPool shared() {
protected Object readResolve() { return GLOBAL; }

// // // Actual API implementation

@Override
public BufferRecycler acquireBufferRecycler() {
// Could link back to this pool as marker? For now just leave back-ref empty
Expand All @@ -177,23 +177,55 @@ public void releaseBufferRecycler(BufferRecycler recycler) {
}
}

/**
* Intermediate base class for instances that are stateful and require
* special handling with respect to JDK serialization, to retain
* "global" reference distinct from non-shared ones.
*/
abstract class StatefulImplBase implements BufferRecyclerPool {
private static final long serialVersionUID = 1L;

protected final static int SERIALIZATION_SHARED = -1;

protected final static int SERIALIZATION_NON_SHARED = 1;

/**
* Value that indicates basic aspects of pool for JDK serialization;
* either marker for shared/non-shared, or possibly bounded size;
* depends on sub-class.
*/
protected final int _serialization;

protected StatefulImplBase(int serialization) {
_serialization = serialization;
}

protected Optional<BufferRecyclerPool> _resolveToShared(BufferRecyclerPool shared) {
if (_serialization == SERIALIZATION_SHARED) {
return Optional.of(shared);
}
return Optional.empty();
}
}

/**
* {@link BufferRecyclerPool} implementation that uses
* {@link ConcurrentLinkedDeque} for recycling instances.
*<p>
* Pool is unbounded: see {@link BufferRecyclerPool} what this means.
*/
class ConcurrentDequePool implements BufferRecyclerPool
class ConcurrentDequePool extends StatefulImplBase
{
private static final long serialVersionUID = 1L;

private static final BufferRecyclerPool GLOBAL = new ConcurrentDequePool();
private static final ConcurrentDequePool GLOBAL = new ConcurrentDequePool(SERIALIZATION_SHARED);

private final transient Deque<BufferRecycler> pool;

// // // Life-cycle (constructors, factory methods)

protected ConcurrentDequePool() {
protected ConcurrentDequePool(int serialization) {
super(serialization);
pool = new ConcurrentLinkedDeque<>();
}

Expand All @@ -204,7 +236,7 @@ protected ConcurrentDequePool() {
*
* @return Shared pool instance
*/
public static BufferRecyclerPool shared() {
public static ConcurrentDequePool shared() {
return GLOBAL;
}

Expand All @@ -213,30 +245,28 @@ public static BufferRecyclerPool shared() {
*
* @return Newly constructed, non-shared pool instance
*/
public static BufferRecyclerPool nonShared() {
return new ConcurrentDequePool();
public static ConcurrentDequePool nonShared() {
return new ConcurrentDequePool(SERIALIZATION_NON_SHARED);
}

// // // JDK serialization support

/**
* To avoid serializing pool contents we made {@code pool} {@code transient};
* to compensate, need to re-create proper instance using constructor.
* Make sure to re-link to global/shared or non-shared.
*/
protected Object readResolve() {
return new ConcurrentDequePool();
return _resolveToShared(GLOBAL).orElseGet(() -> nonShared());
}

// // // Actual API implementation

@Override
public BufferRecycler acquireBufferRecycler() {
return getBufferRecycler().withPool(this);
}

private BufferRecycler getBufferRecycler() {
BufferRecycler bufferRecycler = pool.pollFirst();
return bufferRecycler != null ? bufferRecycler : new BufferRecycler();
if (bufferRecycler == null) {
bufferRecycler = new BufferRecycler();
}
return bufferRecycler.withPool(this);
}

@Override
Expand All @@ -248,24 +278,25 @@ public void releaseBufferRecycler(BufferRecycler bufferRecycler) {
/**
* {@link BufferRecyclerPool} implementation that uses
* a lock free linked list for recycling instances.
*<p>
* Pool is unbounded: see {@link BufferRecyclerPool} what this means.
* Pool is unbounded: see {@link BufferRecyclerPool} for
* details on what this means.
*/
class LockFreePool implements BufferRecyclerPool
class LockFreePool extends StatefulImplBase
{
private static final long serialVersionUID = 1L;

/**
* Globally shared pool instance.
*/
private static final BufferRecyclerPool GLOBAL = new LockFreePool();
private static final LockFreePool GLOBAL = new LockFreePool(SERIALIZATION_SHARED);

// Needs to be transient to avoid JDK serialization from writing it out
private final transient AtomicReference<LockFreePool.Node> head;

// // // Life-cycle (constructors, factory methods)

private LockFreePool() {
private LockFreePool(int serialization) {
super(serialization);
head = new AtomicReference<>();
}

Expand All @@ -276,7 +307,7 @@ private LockFreePool() {
*
* @return Shared pool instance
*/
public static BufferRecyclerPool shared() {
public static LockFreePool shared() {
return GLOBAL;
}

Expand All @@ -285,28 +316,27 @@ public static BufferRecyclerPool shared() {
*
* @return Newly constructed, non-shared pool instance
*/
public static BufferRecyclerPool nonShared() {
return new LockFreePool();
public static LockFreePool nonShared() {
return new LockFreePool(SERIALIZATION_NON_SHARED);
}

// // // JDK serialization support

/**
* To avoid serializing pool contents we made {@code head} {@code transient};
* to compensate, need to re-create proper instance using constructor.
* Make sure to re-link to global/shared or non-shared.
*/
protected Object readResolve() {
return new LockFreePool();
return _resolveToShared(GLOBAL).orElseGet(() -> nonShared());
}

// // // Actual API implementation

@Override
public BufferRecycler acquireBufferRecycler() {
return getBufferRecycler().withPool(this);
return _getRecycler().withPool(this);
}

private BufferRecycler getBufferRecycler() {
private BufferRecycler _getRecycler() {
// This simple lock free algorithm uses an optimistic compareAndSet strategy to
// populate the underlying linked list in a thread-safe way. However, under very
// heavy contention, the compareAndSet could fail multiple times, so it seems a
Expand Down Expand Up @@ -335,7 +365,7 @@ public void releaseBufferRecycler(BufferRecycler bufferRecycler) {
}
}

static class Node {
private static class Node {
final BufferRecycler value;
LockFreePool.Node next;

Expand All @@ -352,7 +382,7 @@ static class Node {
* {@link BufferRecycler} instances than its size configuration:
* the default size is {@link BoundedPool#DEFAULT_CAPACITY}.
*/
class BoundedPool implements BufferRecyclerPool
class BoundedPool extends StatefulImplBase
{
private static final long serialVersionUID = 1L;

Expand All @@ -362,13 +392,17 @@ class BoundedPool implements BufferRecyclerPool
*/
public final static int DEFAULT_CAPACITY = 100;

private static final BufferRecyclerPool GLOBAL = new BoundedPool(DEFAULT_CAPACITY);
private static final BoundedPool GLOBAL = new BoundedPool(SERIALIZATION_SHARED);

private final transient Queue<BufferRecycler> pool;
private final transient ArrayBlockingQueue<BufferRecycler> pool;

private final transient int capacity;

// // // Life-cycle (constructors, factory methods)

protected BoundedPool(int capacity) {
protected BoundedPool(int capacityAsId) {
super(capacityAsId);
capacity = (capacityAsId <= 0) ? DEFAULT_CAPACITY : capacityAsId;
pool = new ArrayBlockingQueue<>(capacity);
}

Expand All @@ -379,44 +413,53 @@ protected BoundedPool(int capacity) {
*
* @return Shared pool instance
*/
public static BufferRecyclerPool shared() {
public static BoundedPool shared() {
return GLOBAL;
}

/**
* Accessor for creating and returning a new, non-shared pool instance.
*
* @param capacity Maximum capacity of the pool: must be positive number above zero.
*
* @return Newly constructed, non-shared pool instance
*/
public static BufferRecyclerPool nonShared(int capacity) {
public static BoundedPool nonShared(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("capacity must be > 0, was: "+capacity);
}
return new BoundedPool(capacity);
}

// // // JDK serialization support

/**
* To avoid serializing pool contents we made {@code pool} {@code transient};
* to compensate, need to re-create proper instance using constructor.
* Make sure to re-link to global/shared or non-shared.
*/
protected Object readResolve() {
return GLOBAL;
return _resolveToShared(GLOBAL).orElseGet(() -> nonShared(_serialization));
}

// // // Actual API implementation

@Override
public BufferRecycler acquireBufferRecycler() {
return getBufferRecycler().withPool(this);
}

private BufferRecycler getBufferRecycler() {
BufferRecycler bufferRecycler = pool.poll();
return bufferRecycler != null ? bufferRecycler : new BufferRecycler();
if (bufferRecycler == null) {
bufferRecycler = new BufferRecycler();
}
return bufferRecycler.withPool(this);
}

@Override
public void releaseBufferRecycler(BufferRecycler bufferRecycler) {
pool.offer(bufferRecycler);
}

// // // Other methods

public int capacity() {
return capacity;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class BufferRecyclers
/**
* System property that is checked to see if recycled buffers (see {@link BufferRecycler})
* should be tracked, for purpose of forcing release of all such buffers, typically
* during major classloading.
* during major garbage-collection.
*
* @since 2.9.6
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,26 +119,33 @@ public void testRecyclerPools() throws Exception
_testRecyclerPoolGlobal(BufferRecyclerPool.nonRecyclingPool());
_testRecyclerPoolGlobal(BufferRecyclerPool.threadLocalPool());

// Then non-shared pool implementations
_testRecyclerPoolGlobal(BufferRecyclerPool.ConcurrentDequePool.shared());
_testRecyclerPoolGlobal(BufferRecyclerPool.LockFreePool.shared());
BufferRecyclerPool.BoundedPool bounded =
_testRecyclerPoolGlobal(BufferRecyclerPool.BoundedPool.shared());
assertEquals(BufferRecyclerPool.BoundedPool.DEFAULT_CAPACITY, bounded.capacity());

_testRecyclerPoolNonShared(BufferRecyclerPool.ConcurrentDequePool.nonShared());
_testRecyclerPoolNonShared(BufferRecyclerPool.LockFreePool.nonShared());

// !!! TODO: Should test that shared/global singleton pools remained
// as global/shared too; but not yet implemented
bounded = _testRecyclerPoolNonShared(BufferRecyclerPool.BoundedPool.nonShared(250));
assertEquals(250, bounded.capacity());
}

private void _testRecyclerPoolGlobal(BufferRecyclerPool pool) throws Exception {
private <T extends BufferRecyclerPool> T _testRecyclerPoolGlobal(T pool) throws Exception {
byte[] stuff = jdkSerialize(pool);
BufferRecyclerPool result = jdkDeserialize(stuff);
T result = jdkDeserialize(stuff);
assertNotNull(result);
assertSame(pool.getClass(), result.getClass());
return result;
}

private void _testRecyclerPoolNonShared(BufferRecyclerPool pool) throws Exception {
private <T extends BufferRecyclerPool> T _testRecyclerPoolNonShared(T pool) throws Exception {
byte[] stuff = jdkSerialize(pool);
BufferRecyclerPool result = jdkDeserialize(stuff);
T result = jdkDeserialize(stuff);
assertNotNull(result);
assertEquals(pool.getClass(), result.getClass());
assertNotSame(pool, result);
return result;
}

/*
Expand Down

0 comments on commit eb2461b

Please sign in to comment.