Skip to content

Commit

Permalink
Introducing the FC-Heap concurrent heap implementation based on
Browse files Browse the repository at this point in the history
flat-combining.
  • Loading branch information
nkallima committed Dec 10, 2024
1 parent c176986 commit 54e99ea
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 1 deletion.
78 changes: 78 additions & 0 deletions benchmarks/fcheapbench.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <stdint.h>

#include <config.h>
#include <primitives.h>
#include <fastrand.h>
#include <threadtools.h>
#include <fcheap.h>
#include <barrier.h>
#include <bench_args.h>

FCHeapStruct *object_struct CACHE_ALIGN;
int64_t d1 CACHE_ALIGN, d2;
SynchBarrier bar CACHE_ALIGN;
SynchBenchArgs bench_args CACHE_ALIGN;

inline static void *Execute(void* Arg) {
FCHeapThreadState *th_state;
long i, rnum;
volatile int j;
long id = (long) Arg;

synchFastRandomSetSeed(id + 1);
th_state = synchGetAlignedMemory(CACHE_LINE_SIZE, sizeof(FCHeapThreadState));
FCHeapThreadStateInit(object_struct, th_state, (int)id);
if (id == 0) {
for (i = 0; i < SYNCH_HEAP_INITIAL_SIZE/2; i++)
FCHeapInsert(object_struct, th_state, i, 0);
}
synchBarrierWait(&bar);
if (id == 0)
d1 = synchGetTimeMillis();

for (i = 0; i < bench_args.runs; i++) {
// perform an insert operation
FCHeapInsert(object_struct, th_state, bench_args.runs - i, id);
rnum = synchFastRandomRange(1, bench_args.max_work);
for (j = 0; j < rnum; j++)
;
// perform a delete min operation
SynchHeapElement h = FCHeapDeleteMin(object_struct, th_state, id);
if (h==SYNCH_HEAP_EMPTY)
fprintf(stderr, "DEBUG: Empty!\n");
rnum = synchFastRandomRange(1, bench_args.max_work);
for (j = 0; j < rnum; j++)
;
}

return NULL;
}

int main(int argc, char *argv[]) {
synchParseArguments(&bench_args, argc, argv);
object_struct = synchGetAlignedMemory(S_CACHE_LINE_SIZE, sizeof(FCHeapStruct));
FCHeapInit(object_struct, FCHEAP_TYPE_MIN, bench_args.nthreads);
synchBarrierSet(&bar, bench_args.nthreads);
synchStartThreadsN(bench_args.nthreads, Execute, bench_args.fibers_per_thread);
synchJoinThreadsN(bench_args.nthreads - 1);
d2 = synchGetTimeMillis();

printf("time: %d (ms)\tthroughput: %.2f (millions ops/sec)\t", (int) (d2 - d1), 2 * bench_args.runs * bench_args.nthreads/(1000.0*(d2 - d1)));
synchPrintStats(bench_args.nthreads, bench_args.total_runs);

#ifdef DEBUG
fprintf(stderr, "DEBUG: Object state: %lld\n", object_struct->heap.counter - SYNCH_HEAP_INITIAL_SIZE/2);
fprintf(stderr, "DEBUG: rounds: %ld\n", object_struct->heap.rounds);
fprintf(stderr, "DEBUG: initial_items: %lld\n", SYNCH_HEAP_INITIAL_SIZE/2);
fprintf(stderr, "DEBUG: remained_items: %ld\n", object_struct->state.items);
fprintf(stderr, "DEBUG: last_level_used: %u\n", object_struct->state.last_used_level);
fprintf(stderr, "DEBUG: last_used_level_pos: %u\n", object_struct->state.last_used_level_pos);
fprintf(stderr, "DEBUG: Checking heap state: %s\n", ((serialHeapClearAndValidation(&object_struct->state) == true) ? "VALID" : "INVALID"));
#endif

return 0;
}
23 changes: 23 additions & 0 deletions libconcurrent/concurrent/fcheap.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include <serialheap.h>
#include <fcheap.h>

void FCHeapInit(FCHeapStruct *heap_struct, uint32_t type, uint32_t nthreads) {
FCStructInit(&heap_struct->heap, nthreads);
serialHeapInit(&heap_struct->state, type);
}

void FCHeapThreadStateInit(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, int pid) {
FCThreadStateInit(&heap_struct->heap, &lobject_struct->thread_state, pid);
}

void FCHeapInsert(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, SynchHeapElement arg, int pid) {
FCApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, arg | SYNCH_HEAP_INSERT_OP, pid);
}

SynchHeapElement FCHeapDeleteMin(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, int pid) {
return FCApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, SYNCH_HEAP_DELETE_MIN_MAX_OP, pid);
}

SynchHeapElement FCHeapGetMin(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, int pid) {
return FCApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, SYNCH_HEAP_GET_MIN_MAX_OP, pid);
}
82 changes: 82 additions & 0 deletions libconcurrent/includes/fcheap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/// @file fcheap.h
/// @author Nikolaos D. Kallimanis
/// @brief This file exposes the API of the FCHeap, which is a concurrent heap object of fixed size.
/// The provided implementation uses the dynamic serial heap implementation provided by `serialheap.h` combined with the flat-combining object
/// provided by `fc.h`. This dynamic heap implementation is based on the static heap implementation provided
/// in https://github.com/ConcurrentDistributedLab/PersistentCombining repository.
/// An example of use of this API is provided in benchmarks/fcheapbench.c file.
///
/// @copyright Copyright (c) 2024
#ifndef _FCHEAP_H_
#define _FCHEAP_H_

#include <limits.h>
#include <fc.h>

#include <serialheap.h>


/// @brief The type of heap is max-heap
#define FCHEAP_TYPE_MIN SYNCH_HEAP_TYPE_MIN
/// @brief The type of heap is min-heap
#define FCHEAP_TYPE_MAX SYNCH_HEAP_TYPE_MAX

/// @brief FCHeapStruct stores the state of an instance of the FCHeap concurrent heap implementation.
/// FCHeapStruct should be initialized using the FCHeapStructInit function.
typedef struct FCHeapStruct {
/// @brief An instance of FC.
FCStruct heap CACHE_ALIGN;
/// @brief An instance of a serial heap implementation (see `serialheap.h`).
SerialHeapStruct state;
} FCHeapStruct;

/// @brief FCHeapThreadState stores each thread's local state for a single instance of FCHeap.
/// For each instance of FCHeap, a discrete instance of FCHeapThreadStateState should be used.
typedef struct FCHeapThreadState {
/// @brief A FCThreadState struct for the instance of FC.
FCThreadState thread_state;
} FCHeapThreadState;

/// @brief This function initializes an instance of the FCHeap concurrent heap implementation.
///
/// This function should be called once (by a single thread) before any other thread tries to
/// apply any operation on the heap object.
///
/// @param heap_struct A pointer to an instance of the FCHeap concurrent heap implementation.
/// @param type Identifies the type of heap (i.e. min-heap or max-heap). In case that the argument is equal to FCHEAP_TYPE_MIN,
/// the type of heap is min. In case that the argument is equal to FCHEAP_TYPE_MAX, the type of heap is max.
/// @param nthreads The number of threads that will use the FCHeap concurrent heap implementation.
void FCHeapInit(FCHeapStruct *heap_struct, uint32_t type, uint32_t nthreads);

/// @brief This function should be called once by every thread before it applies any operation to the FCHeap concurrent heap implementation.
///
/// @param heap_struct A pointer to an instance of the FCHeap concurrent heap implementation.
/// @param lobject_struct A pointer to thread's local state of FCHeap.
/// @param pid The pid of the calling thread.
void FCHeapThreadStateInit(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, int pid);

/// @brief This function inserts a new element with value `arg` to the heap.
///
/// @param heap_struct A pointer to an instance of the FCHeap concurrent heap implementation.
/// @param lobject_struct A pointer to thread's local state of FCHeap.
/// @param arg The value of the element that will be inserted in the heap.
/// @param pid The pid of the calling thread.
void FCHeapInsert(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, SynchHeapElement arg, int pid);

/// @brief This function removes the element of the heap that has the minimum value.
///
/// @param heap_struct A pointer to an instance of the FCHeap concurrent heap implementation.
/// @param lobject_struct A pointer to thread's local state of FCHeap.
/// @param pid The pid of the calling thread.
/// @return The value of the removed element. In case that the heap is empty `SYNCH_HEAP_EMPTY` is returned.
SynchHeapElement FCHeapDeleteMin(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, int pid);

/// @brief This function returns (without removing) the element of the heap that has the minimum value.
///
/// @param heap_struct A pointer to an instance of the FCHeap concurrent heap implementation.
/// @param lobject_struct A pointer to thread's local state of FCHeap.
/// @param pid The pid of the calling thread.
/// @return The value of the minimum element contained in the heap. In case that the heap is empty `SYNCH_HEAP_EMPTY` is returned.
SynchHeapElement FCHeapGetMin(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, int pid);

#endif
2 changes: 1 addition & 1 deletion validate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ COLOR_FAIL="[ \e[31mFAIL\e[39m ]"
declare -a uobjects=( "ccsynchbench.run" "dsmsynchbench.run" "hsynchbench.run" "oscibench.run" "simbench.run" "fcbench.run" "oyamabench.run" "mcsbench.run" "clhbench.run" "pthreadsbench.run" "fadbench.run")
declare -a queues=( "ccqueuebench.run" "clhqueuebench.run" "dsmqueuebench.run" "hqueuebench.run" "osciqueuebench.run" "simqueuebench.run" "fcqueuebench.run" "lcrqbench.run")
declare -a stacks=( "ccstackbench.run" "clhstackbench.run" "dsmstackbench.run" "hstackbench.run" "oscistackbench.run" "simstackbench.run" "fcstackbench.run")
declare -a heaps=( "ccheapbench.run" "dsmheapbench.run" "hheapbench.run")
declare -a heaps=( "ccheapbench.run" "dsmheapbench.run" "hheapbench.run" "fcheapbench.run")
declare -a hashtables=("clhhashbench.run" "dsmhashbench.run")

if [ "$1" = "--help" ] || [ "$1" = "-h" ]; then
Expand Down

0 comments on commit 54e99ea

Please sign in to comment.