Skip to content

Commit

Permalink
Reproduction of and fix for #81
Browse files Browse the repository at this point in the history
* Reproduction of #81
* Make lock acquisition atomic, Fixes #81
  • Loading branch information
nicktindall authored Oct 27, 2021
1 parent 0bdef18 commit 6a7883d
Show file tree
Hide file tree
Showing 11 changed files with 495 additions and 390 deletions.
47 changes: 12 additions & 35 deletions affinity/src/main/java/net/openhft/affinity/LockCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* @author Rob Austin.
*/
enum LockCheck {
public enum LockCheck {
; // none

private static final Logger LOGGER = LoggerFactory.getLogger(LockCheck.class);
Expand All @@ -38,7 +38,7 @@ enum LockCheck {

private static final LockChecker lockChecker = FileLockBasedLockChecker.getInstance();

static long getPID() {
public static long getPID() {
String processName =
java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
return Long.parseLong(processName.split("@")[0]);
Expand All @@ -52,30 +52,14 @@ public static boolean isCpuFree(int cpu) {
if (!canOSSupportOperation())
return true;

if (isLockFree(cpu)) {
return true;
} else {
int currentProcess = 0;
try {
currentProcess = getProcessForCpu(cpu);
} catch (RuntimeException | IOException e) {
LOGGER.warn("Failed to determine process on cpu " + cpu, e);
e.printStackTrace();
return true;
}
if (!isProcessRunning(currentProcess)) {
lockChecker.releaseLock(cpu);
return true;
}
return false;
}
return isLockFree(cpu);
}

static void replacePid(int cpu, long processID) throws IOException {
storePid(processID, cpu);
static boolean replacePid(int cpu, long processID) throws IOException {
return storePid(processID, cpu);
}

static boolean isProcessRunning(long pid) {
public static boolean isProcessRunning(long pid) {
if (canOSSupportOperation())
return new File("/proc/" + pid).exists();
else
Expand All @@ -86,17 +70,15 @@ static boolean isProcessRunning(long pid) {
* stores the pid in a file, named by the core, the pid is written to the file with the date
* below
*/
private synchronized static void storePid(long processID, int cpu) throws IOException {
if (!lockChecker.obtainLock(cpu, Long.toString(processID))) {
throw new IOException(String.format("Cannot obtain file lock for cpu %d", cpu));
}
private synchronized static boolean storePid(long processID, int cpu) throws IOException {
return lockChecker.obtainLock(cpu, Long.toString(processID));
}

private synchronized static boolean isLockFree(int id) {
return lockChecker.isLockFree(id);
}

static int getProcessForCpu(int core) throws IOException {
public static int getProcessForCpu(int core) throws IOException {
String meta = lockChecker.getMetaInfo(core);

if (meta != null && !meta.isEmpty()) {
Expand All @@ -109,15 +91,10 @@ static int getProcessForCpu(int core) throws IOException {
return EMPTY_PID;
}

static void updateCpu(int cpu) {
static boolean updateCpu(int cpu) throws IOException {
if (!canOSSupportOperation())
return;
try {
replacePid(cpu, getPID());
} catch (IOException e) {
LOGGER.warn("Failed to update lock file for cpu " + cpu, e);
e.printStackTrace();
}
return true;
return replacePid(cpu, getPID());
}

public static void releaseLock(int cpu) {
Expand Down
41 changes: 30 additions & 11 deletions affinity/src/main/java/net/openhft/affinity/LockInventory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.NavigableMap;
import java.util.TreeMap;

Expand Down Expand Up @@ -69,9 +70,24 @@ private static boolean isAnyCpu(final int cpuId) {
return cpuId == AffinityLock.ANY_CPU;
}

private static void updateLockForCurrentThread(final boolean bind, final AffinityLock al, final boolean b) {
al.assignCurrentThread(bind, b);
LockCheck.updateCpu(al.cpuId());
/**
* Update the lock for the current thread
*
* @param bind Whether to also bind the thread to the core
* @param al The lock to update
* @param wholeCore Whether to bind the whole core
* @return true if the lock was acquired, false otherwise
*/
private static boolean updateLockForCurrentThread(final boolean bind, final AffinityLock al, final boolean wholeCore) {
try {
if (LockCheck.updateCpu(al.cpuId())) {
al.assignCurrentThread(bind, wholeCore);
return true;
}
} catch (IOException e) {
LOGGER.warn("Error occurred acquiring lock", e);
}
return false;
}

public final synchronized CpuLayout getCpuLayout() {
Expand Down Expand Up @@ -106,8 +122,9 @@ public final synchronized AffinityLock acquireLock(boolean bind, int cpuId, Affi
final boolean specificCpuRequested = !isAnyCpu(cpuId);
if (specificCpuRequested && cpuId != 0) {
final AffinityLock required = logicalCoreLocks[cpuId];
if (required.canReserve(true) && anyStrategyMatches(cpuId, cpuId, strategies)) {
updateLockForCurrentThread(bind, required, false);
if (required.canReserve(true)
&& anyStrategyMatches(cpuId, cpuId, strategies)
&& updateLockForCurrentThread(bind, required, false)) {
return required;
}
LOGGER.warn("Unable to acquire lock on CPU {} for thread {}, trying to find another CPU",
Expand All @@ -119,8 +136,9 @@ public final synchronized AffinityLock acquireLock(boolean bind, int cpuId, Affi
// if you have only one core, this library is not appropriate in any case.
for (int i = logicalCoreLocks.length - 1; i > 0; i--) {
AffinityLock al = logicalCoreLocks[i];
if (al.canReserve(false) && (isAnyCpu(cpuId) || strategy.matches(cpuId, al.cpuId()))) {
updateLockForCurrentThread(bind, al, false);
if (al.canReserve(false)
&& (isAnyCpu(cpuId) || strategy.matches(cpuId, al.cpuId()))
&& updateLockForCurrentThread(bind, al, false)) {
return al;
}
}
Expand All @@ -136,8 +154,8 @@ public final synchronized AffinityLock tryAcquireLock(boolean bind, int cpuId) {
return null;

final AffinityLock required = logicalCoreLocks[cpuId];
if (required.canReserve(true)) {
updateLockForCurrentThread(bind, required, false);
if (required.canReserve(true)
&& updateLockForCurrentThread(bind, required, false)) {
return required;
}

Expand All @@ -156,8 +174,9 @@ public final synchronized AffinityLock acquireCore(boolean bind, int cpuId, Affi
continue LOOP;

final AffinityLock al = als[0];
updateLockForCurrentThread(bind, al, true);
return al;
if (updateLockForCurrentThread(bind, al, true)) {
return al;
}
}
}

Expand Down

This file was deleted.

Loading

0 comments on commit 6a7883d

Please sign in to comment.