Skip to content

Commit

Permalink
8343394: Make MemorySessionImpl.state a stable field
Browse files Browse the repository at this point in the history
Co-authored-by: Maurizio Cimadamore <[email protected]>
Reviewed-by: mcimadamore, jvernee
  • Loading branch information
Quan Anh Mai and mcimadamore committed Nov 7, 2024
1 parent d2b681d commit 1d117f6
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ final class ConfinedSession extends MemorySessionImpl {

private int asyncReleaseCount = 0;

static final VarHandle ASYNC_RELEASE_COUNT= MhUtil.findVarHandle(
MethodHandles.lookup(), "asyncReleaseCount", int.class);
static final VarHandle ASYNC_RELEASE_COUNT= MhUtil.findVarHandle(MethodHandles.lookup(), "asyncReleaseCount", int.class);

public ConfinedSession(Thread owner) {
super(owner, new ConfinedResourceList());
Expand All @@ -52,17 +51,17 @@ public ConfinedSession(Thread owner) {
@ForceInline
public void acquire0() {
checkValidState();
if (state == MAX_FORKS) {
if (acquireCount == MAX_FORKS) {
throw tooManyAcquires();
}
state++;
acquireCount++;
}

@Override
@ForceInline
public void release0() {
if (Thread.currentThread() == owner) {
state--;
acquireCount--;
} else {
// It is possible to end up here in two cases: this session was kept alive by some other confined session
// which is implicitly released (in which case the release call comes from the cleaner thread). Or,
Expand All @@ -75,11 +74,11 @@ public void release0() {
void justClose() {
checkValidState();
int asyncCount = (int)ASYNC_RELEASE_COUNT.getVolatile(this);
if ((state == 0 && asyncCount == 0)
|| ((state - asyncCount) == 0)) {
int acquire = acquireCount - asyncCount;
if (acquire == 0) {
state = CLOSED;
} else {
throw alreadyAcquired(state - asyncCount);
throw alreadyAcquired(acquire);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ non-sealed class GlobalSession extends MemorySessionImpl {

public GlobalSession() {
super(null, null);
this.state = NONCLOSEABLE;
}

@Override
Expand All @@ -50,11 +51,6 @@ public void release0() {
// do nothing
}

@Override
public boolean isCloseable() {
return false;
}

@Override
@ForceInline
public void acquire0() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ final class ImplicitSession extends SharedSession {

public ImplicitSession(Cleaner cleaner) {
super();
this.state = NONCLOSEABLE;
cleaner.register(this, resourceList);
}

Expand All @@ -55,11 +56,6 @@ public void acquire0() {
// do nothing
}

@Override
public boolean isCloseable() {
return false;
}

@Override
public void justClose() {
throw nonCloseable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import jdk.internal.misc.ScopedMemoryAccess;
import jdk.internal.invoke.MhUtil;
import jdk.internal.vm.annotation.ForceInline;
import jdk.internal.vm.annotation.Stable;

/**
* This class manages the temporal bounds associated with a memory segment as well
Expand All @@ -55,11 +56,19 @@
public abstract sealed class MemorySessionImpl
implements Scope
permits ConfinedSession, GlobalSession, SharedSession {

/**
* The value of the {@code state} of a {@code MemorySessionImpl}. The only possible transition
* is OPEN -> CLOSED. As a result, the states CLOSED and NONCLOSEABLE are stable. This allows
* us to annotate {@code state} with {@link Stable} and elide liveness check on non-closeable
* constant scopes, such as {@code GLOBAL_SESSION}.
*/
static final int OPEN = 0;
static final int CLOSED = -1;
static final int NONCLOSEABLE = 1;

static final VarHandle STATE = MhUtil.findVarHandle(
MethodHandles.lookup(), "state", int.class);
static final VarHandle STATE = MhUtil.findVarHandle(MethodHandles.lookup(), "state", int.class);
static final VarHandle ACQUIRE_COUNT = MhUtil.findVarHandle(MethodHandles.lookup(), "acquireCount", int.class);

static final int MAX_FORKS = Integer.MAX_VALUE;

Expand All @@ -70,7 +79,11 @@ public abstract sealed class MemorySessionImpl

final ResourceList resourceList;
final Thread owner;
int state = OPEN;

@Stable
int state;

int acquireCount;

public Arena asArena() {
return new ArenaImpl(this);
Expand Down Expand Up @@ -214,8 +227,8 @@ protected Object clone() throws CloneNotSupportedException {
throw new CloneNotSupportedException();
}

public boolean isCloseable() {
return true;
public final boolean isCloseable() {
return state <= OPEN;
}

/**
Expand Down
39 changes: 26 additions & 13 deletions src/java.base/share/classes/jdk/internal/foreign/SharedSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ sealed class SharedSession extends MemorySessionImpl permits ImplicitSession {

private static final ScopedMemoryAccess SCOPED_MEMORY_ACCESS = ScopedMemoryAccess.getScopedMemoryAccess();

private static final int CLOSED_ACQUIRE_COUNT = -1;

SharedSession() {
super(null, new SharedResourceList());
}
Expand All @@ -53,40 +55,51 @@ sealed class SharedSession extends MemorySessionImpl permits ImplicitSession {
public void acquire0() {
int value;
do {
value = (int) STATE.getVolatile(this);
if (value < OPEN) {
value = (int) ACQUIRE_COUNT.getVolatile(this);
if (value < 0) {
//segment is not open!
throw alreadyClosed();
throw sharedSessionAlreadyClosed();
} else if (value == MAX_FORKS) {
//overflow
throw tooManyAcquires();
}
} while (!STATE.compareAndSet(this, value, value + 1));
} while (!ACQUIRE_COUNT.compareAndSet(this, value, value + 1));
}

@Override
@ForceInline
public void release0() {
int value;
do {
value = (int) STATE.getVolatile(this);
if (value <= OPEN) {
value = (int) ACQUIRE_COUNT.getVolatile(this);
if (value <= 0) {
//cannot get here - we can't close segment twice
throw alreadyClosed();
throw sharedSessionAlreadyClosed();
}
} while (!STATE.compareAndSet(this, value, value - 1));
} while (!ACQUIRE_COUNT.compareAndSet(this, value, value - 1));
}

void justClose() {
int prevState = (int) STATE.compareAndExchange(this, OPEN, CLOSED);
if (prevState < 0) {
throw alreadyClosed();
} else if (prevState != OPEN) {
throw alreadyAcquired(prevState);
int acquireCount = (int) ACQUIRE_COUNT.compareAndExchange(this, 0, CLOSED_ACQUIRE_COUNT);
if (acquireCount < 0) {
throw sharedSessionAlreadyClosed();
} else if (acquireCount > 0) {
throw alreadyAcquired(acquireCount);
}

STATE.setVolatile(this, CLOSED);
SCOPED_MEMORY_ACCESS.closeScope(this, ALREADY_CLOSED);
}

private IllegalStateException sharedSessionAlreadyClosed() {
// To avoid the situation where a scope fails to be acquired or closed but still reports as
// alive afterward, we wait for the state to change before throwing the exception
while ((int) STATE.getVolatile(this) == OPEN) {
Thread.onSpinWait();
}
return alreadyClosed();
}

/**
* A shared resource list; this implementation has to handle add vs. add races, as well as add vs. cleanup races.
*/
Expand Down
77 changes: 70 additions & 7 deletions test/jdk/java/foreign/TestMemorySession.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -28,19 +28,18 @@
*/

import java.lang.foreign.Arena;

import jdk.internal.foreign.MemorySessionImpl;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.*;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import jdk.internal.foreign.MemorySessionImpl;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.testng.Assert.*;

public class TestMemorySession {

Expand Down Expand Up @@ -319,6 +318,70 @@ public void testIsCloseableBy(ArenaSupplier arenaSupplier) {
assertEquals(sessionImpl.isCloseableBy(otherThread), isCloseableByOther);
}

/**
* Test that a thread failing to acquire a scope will not observe it as alive afterwards.
*/
@Test
public void testAcquireCloseRace() throws InterruptedException {
int iteration = 1000;
AtomicInteger lock = new AtomicInteger();
boolean[] result = new boolean[1];
lock.set(-2);
MemorySessionImpl[] scopes = new MemorySessionImpl[iteration];
for (int i = 0; i < iteration; i++) {
scopes[i] = MemorySessionImpl.toMemorySession(Arena.ofShared());
}

// This thread tries to close the scopes
Thread t1 = new Thread(() -> {
for (int i = 0; i < iteration; i++) {
MemorySessionImpl scope = scopes[i];
while (true) {
try {
scope.close();
break;
} catch (IllegalStateException e) {}
}
// Keep the 2 threads operating on the same scope
int k = lock.getAndAdd(1) + 1;
while (k != i * 2) {
Thread.onSpinWait();
k = lock.get();
}
}
});

// This thread tries to acquire the scopes, then check if it is alive after an acquire failure
Thread t2 = new Thread(() -> {
for (int i = 0; i < iteration; i++) {
MemorySessionImpl scope = scopes[i];
while (true) {
try {
scope.acquire0();
} catch (IllegalStateException e) {
if (scope.isAlive()) {
result[0] = true;
}
break;
}
scope.release0();
}
// Keep the 2 threads operating on the same scope
int k = lock.getAndAdd(1) + 1;
while (k != i * 2) {
Thread.onSpinWait();
k = lock.get();
}
}
});

t1.start();
t2.start();
t1.join();
t2.join();
assertFalse(result[0]);
}

private void waitSomeTime() {
try {
Thread.sleep(10);
Expand Down
Loading

0 comments on commit 1d117f6

Please sign in to comment.