Skip to content

Commit

Permalink
KAFKA-16374; High watermark updates should have a higher priority (ap…
Browse files Browse the repository at this point in the history
…ache#15534)

When the group coordinator is under heavy load, the current mechanism to release pending events based on updated high watermark, which consist in pushing an event at the end of the queue, is bad because pending events pay the cost of the queue twice. A first time for the handling of the first event and a second time for the handling of the hwm update. This patch changes this logic to push the hwm update event to the front of the queue in order to release pending events as soon as as possible.

Reviewers: Jeff Kim <[email protected]>, Justine Olshan <[email protected]>
  • Loading branch information
dajac authored Mar 25, 2024
1 parent 7b2fc46 commit be17df6
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,18 @@
public interface CoordinatorEventProcessor extends AutoCloseable {

/**
* Enqueues a new {{@link CoordinatorEvent}}.
* Enqueues a new {{@link CoordinatorEvent}} at the end of the processor.
*
* @param event The event.
* @throws RejectedExecutionException If the event processor is closed.
*/
void enqueue(CoordinatorEvent event) throws RejectedExecutionException;
void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException;

/**
* Enqueues a new {{@link CoordinatorEvent}} at the front of the processor.
*
* @param event The event.
* @throws RejectedExecutionException If the event processor is closed.
*/
void enqueueFirst(CoordinatorEvent event) throws RejectedExecutionException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -356,7 +357,7 @@ public void run() {

log.debug("Scheduling write event {} for timer {}.", event.name, key);
try {
enqueue(event);
enqueueLast(event);
} catch (NotCoordinatorException ex) {
log.info("Failed to enqueue write event {} for timer {} because the runtime is closed. Ignoring it.",
event.name, key);
Expand Down Expand Up @@ -438,6 +439,12 @@ class CoordinatorContext {
*/
SnapshottableCoordinator<S, U> coordinator;

/**
* The high watermark listener registered to all the partitions
* backing the coordinators.
*/
HighWatermarkListener highWatermarklistener;

/**
* Constructor.
*
Expand Down Expand Up @@ -495,6 +502,7 @@ private void transitionTo(

case ACTIVE:
state = CoordinatorState.ACTIVE;
highWatermarklistener = new HighWatermarkListener();
partitionWriter.registerListener(tp, highWatermarklistener);
coordinator.onLoaded(metadataImage);
break;
Expand All @@ -520,7 +528,10 @@ private void transitionTo(
* Unloads the coordinator.
*/
private void unload() {
partitionWriter.deregisterListener(tp, highWatermarklistener);
if (highWatermarklistener != null) {
partitionWriter.deregisterListener(tp, highWatermarklistener);
highWatermarklistener = null;
}
timer.cancelAll();
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
Expand Down Expand Up @@ -1179,6 +1190,23 @@ public String toString() {
* backing the coordinator are updated.
*/
class HighWatermarkListener implements PartitionWriter.Listener {

private static final long NO_OFFSET = -1L;

/**
* The atomic long is used to store the last and unprocessed high watermark
* received from the partition. The atomic value is replaced by -1L when
* the high watermark is taken to update the context.
*/
private final AtomicLong lastHighWatermark = new AtomicLong(NO_OFFSET);

/**
* @return The last high watermark received or NO_OFFSET is none is pending.
*/
public long lastHighWatermark() {
return lastHighWatermark.get();
}

/**
* Updates the high watermark of the corresponding coordinator.
*
Expand All @@ -1191,30 +1219,37 @@ public void onHighWatermarkUpdated(
long offset
) {
log.debug("High watermark of {} incremented to {}.", tp, offset);
scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> {
CoordinatorContext context = coordinators.get(tp);
if (context != null) {
context.lock.lock();
try {
if (context.state == CoordinatorState.ACTIVE) {
// The updated high watermark can be applied to the coordinator only if the coordinator
// exists and is in the active state.
log.debug("Updating high watermark of {} to {}.", tp, offset);
context.coordinator.updateLastCommittedOffset(offset);
context.deferredEventQueue.completeUpTo(offset);
coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset);
} else {
log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.",
tp, offset);
if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
// An event to apply the new high watermark is pushed to the front of the
// queue only if the previous value was -1L. If it was not, it means that
// there is already an event waiting to process the last value.
enqueueFirst(new CoordinatorInternalEvent("HighWatermarkUpdate", tp, () -> {
long newHighWatermark = lastHighWatermark.getAndSet(NO_OFFSET);

CoordinatorContext context = coordinators.get(tp);
if (context != null) {
context.lock.lock();
try {
if (context.state == CoordinatorState.ACTIVE) {
// The updated high watermark can be applied to the coordinator only if the coordinator
// exists and is in the active state.
log.debug("Updating high watermark of {} to {}.", tp, newHighWatermark);
context.coordinator.updateLastCommittedOffset(newHighWatermark);
context.deferredEventQueue.completeUpTo(newHighWatermark);
coordinatorMetrics.onUpdateLastCommittedOffset(tp, newHighWatermark);
} else {
log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.",
tp, newHighWatermark);
}
} finally {
context.lock.unlock();
}
} finally {
context.lock.unlock();
} else {
log.debug("Ignored high watermark updated for {} to {} because the coordinator does not exist.",
tp, newHighWatermark);
}
} else {
log.debug("Ignored high watermark updated for {} to {} because the coordinator does not exist.",
tp, offset);
}
});
}));
}
}
}

Expand Down Expand Up @@ -1263,12 +1298,6 @@ public void onHighWatermarkUpdated(
*/
private final PartitionWriter<U> partitionWriter;

/**
* The high watermark listener registered to all the partitions
* backing the coordinators.
*/
private final HighWatermarkListener highWatermarklistener;

/**
* The coordinator loaded used by the runtime.
*/
Expand Down Expand Up @@ -1335,7 +1364,6 @@ private CoordinatorRuntime(
this.coordinators = new ConcurrentHashMap<>();
this.processor = processor;
this.partitionWriter = partitionWriter;
this.highWatermarklistener = new HighWatermarkListener();
this.loader = loader;
this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
this.runtimeMetrics = runtimeMetrics;
Expand All @@ -1353,14 +1381,28 @@ private void throwIfNotRunning() {
}

/**
* Enqueues a new event.
* Enqueues a new event at the end of the processing queue.
*
* @param event The event.
* @throws NotCoordinatorException If the event processor is closed.
*/
private void enqueueLast(CoordinatorEvent event) {
try {
processor.enqueueLast(event);
} catch (RejectedExecutionException ex) {
throw new NotCoordinatorException("Can't accept an event because the processor is closed", ex);
}
}

/**
* Enqueues a new event at the front of the processing queue.
*
* @param event The event.
* @throws NotCoordinatorException If the event processor is closed.
*/
private void enqueue(CoordinatorEvent event) {
private void enqueueFirst(CoordinatorEvent event) {
try {
processor.enqueue(event);
processor.enqueueFirst(event);
} catch (RejectedExecutionException ex) {
throw new NotCoordinatorException("Can't accept an event because the processor is closed", ex);
}
Expand Down Expand Up @@ -1442,7 +1484,7 @@ public <T> CompletableFuture<T> scheduleWriteOperation(
throwIfNotRunning();
log.debug("Scheduled execution of write operation {}.", name);
CoordinatorWriteEvent<T> event = new CoordinatorWriteEvent<>(name, tp, timeout, op);
enqueue(event);
enqueueLast(event);
return event.future;
}

Expand Down Expand Up @@ -1518,7 +1560,7 @@ public <T> CompletableFuture<T> scheduleTransactionalWriteOperation(
timeout,
op
);
enqueue(event);
enqueueLast(event);
return event.future;
});
}
Expand Down Expand Up @@ -1557,7 +1599,7 @@ public CompletableFuture<Void> scheduleTransactionCompletion(
result,
timeout
);
enqueue(event);
enqueueLast(event);
return event.future;
}

Expand All @@ -1581,7 +1623,7 @@ public <T> CompletableFuture<T> scheduleReadOperation(
throwIfNotRunning();
log.debug("Scheduled execution of read operation {}.", name);
CoordinatorReadEvent<T> event = new CoordinatorReadEvent<>(name, tp, op);
enqueue(event);
enqueueLast(event);
return event.future;
}

Expand Down Expand Up @@ -1622,7 +1664,7 @@ private void scheduleInternalOperation(
Runnable op
) {
log.debug("Scheduled execution of internal operation {}.", name);
enqueue(new CoordinatorInternalEvent(name, tp, op));
enqueueLast(new CoordinatorInternalEvent(name, tp, op));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -60,7 +60,7 @@ public interface Event<K> {
/**
* The map of queues keyed by K.
*/
private final Map<K, Queue<T>> queues;
private final Map<K, Deque<T>> queues;

/**
* The list of available keys. Keys in this list can
Expand Down Expand Up @@ -110,25 +110,51 @@ public EventAccumulator(
}

/**
* Adds an {{@link Event}} to the queue.
* Adds an {{@link Event}} at the end of the queue.
*
* @param event An {{@link Event}}.
*/
public void add(T event) throws RejectedExecutionException {
public void addLast(T event) throws RejectedExecutionException {
lock.lock();
try {
if (closed) throw new RejectedExecutionException("Can't accept an event because the accumulator is closed.");

K key = event.key();
Queue<T> queue = queues.get(key);
Deque<T> queue = queues.get(key);
if (queue == null) {
queue = new LinkedList<>();
queues.put(key, queue);
if (!inflightKeys.contains(key)) {
addAvailableKey(key);
}
}
queue.add(event);
queue.addLast(event);
size++;
} finally {
lock.unlock();
}
}

/**
* Adds an {{@link Event}} at the front of the queue.
*
* @param event An {{@link Event}}.
*/
public void addFirst(T event) throws RejectedExecutionException {
lock.lock();
try {
if (closed) throw new RejectedExecutionException("Can't accept an event because the accumulator is closed.");

K key = event.key();
Deque<T> queue = queues.get(key);
if (queue == null) {
queue = new LinkedList<>();
queues.put(key, queue);
if (!inflightKeys.contains(key)) {
addAvailableKey(key);
}
}
queue.addFirst(event);
size++;
} finally {
lock.unlock();
Expand All @@ -147,7 +173,7 @@ public T poll() {
K key = randomKey();
if (key == null) return null;

Queue<T> queue = queues.get(key);
Deque<T> queue = queues.get(key);
T event = queue.poll();

if (queue.isEmpty()) queues.remove(key);
Expand Down Expand Up @@ -181,7 +207,7 @@ public T take() {

if (key == null) return null;

Queue<T> queue = queues.get(key);
Deque<T> queue = queues.get(key);
T event = queue.poll();

if (queue.isEmpty()) queues.remove(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,25 @@ private void recordPollEndTime(long pollEndMs) {
}

/**
* Enqueues a new {{@link CoordinatorEvent}}.
* Enqueues a new {{@link CoordinatorEvent}} at the end of the processor.
*
* @param event The event.
* @throws RejectedExecutionException If the event processor is closed.
*/
@Override
public void enqueue(CoordinatorEvent event) throws RejectedExecutionException {
accumulator.add(event);
public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException {
accumulator.addLast(event);
}

/**
* Enqueues a new {{@link CoordinatorEvent}} at the front of the processor.
*
* @param event The event.
* @throws RejectedExecutionException If the event processor is closed.
*/
@Override
public void enqueueFirst(CoordinatorEvent event) throws RejectedExecutionException {
accumulator.addFirst(event);
}

/**
Expand Down
Loading

0 comments on commit be17df6

Please sign in to comment.