Skip to content

Commit

Permalink
[Utils/ThreadPool] Renamed "action" to "task"
Browse files Browse the repository at this point in the history
- This is more explicit, as "action" is pretty vague
  - The concept of action remains for parallelization functions as the general action to execute; each independent actual job is a task

- Renamed the parallelization functions' "thread count" parameter to "task count"
  - This can effectively be any value, potentially higher than the number of available threads, which the previous name didn't imply
  • Loading branch information
Razakhel committed May 19, 2024
1 parent 81d4be3 commit 54a2eac
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 81 deletions.
6 changes: 3 additions & 3 deletions include/RaZ/Utils/ThreadPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ class ThreadPool {
ThreadPool();
explicit ThreadPool(unsigned int threadCount);

void addAction(std::function<void()> action);
void addTask(std::function<void()> task);

~ThreadPool();

private:
std::vector<std::thread> m_threads {};
bool m_shouldStop = false;

std::mutex m_actionsMutex {};
std::mutex m_tasksMutex {};
std::condition_variable m_condVar {};
std::queue<std::function<void()>> m_actions {};
std::queue<std::function<void()>> m_tasks {};
};

} // namespace Raz
Expand Down
36 changes: 18 additions & 18 deletions include/RaZ/Utils/Threading.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,52 +71,52 @@ template <typename FuncT, typename... Args, typename ResultT = std::invoke_resul

/// Calls a function in parallel.
/// \note If using Emscripten this call will be synchronous, threads being unsupported with it for now.
/// \param action Action to be performed by each thread.
/// \param threadCount Amount of threads to start an instance on.
void parallelize(const std::function<void()>& action, unsigned int threadCount = getSystemThreadCount());
/// \param action Action to be performed in parallel.
/// \param taskCount Amount of tasks to start.
void parallelize(const std::function<void()>& action, unsigned int taskCount = getSystemThreadCount());

/// Calls the given functions in parallel, each on its own thread.
/// Calls the given functions in parallel.
/// \note If using Emscripten this call will be synchronous, threads being unsupported with it for now.
/// \param actions Actions to be performed in parallel.
void parallelize(std::initializer_list<std::function<void()>> actions);

/// Calls a function in parallel over an index range.
/// The indices are automatically split, giving a separate start/past-the-end index range to each thread.
/// The given index range is automatically split, providing a separate start/past-the-end index sub-range to each task.
/// \note If using Emscripten this call will be synchronous, threads being unsupported with it for now.
/// \tparam BegIndexT Type of the begin index.
/// \tparam EndIndexT Type of the end index.
/// \tparam FuncT Type of the action to be executed.
/// \param beginIndex Starting index of the whole range. Must be lower than the end index.
/// \param endIndex Past-the-last index of the whole range. Must be greater than the begin index.
/// \param action Action to be performed by each thread, taking an index range as boundaries.
/// \param threadCount Amount of threads to start an instance on.
/// \param action Action to be performed in parallel, taking an index range as boundaries.
/// \param taskCount Amount of tasks to start.
template <typename BegIndexT, typename EndIndexT, typename FuncT, typename = std::enable_if_t<std::is_integral_v<BegIndexT> && std::is_integral_v<EndIndexT>>>
void parallelize(BegIndexT beginIndex, EndIndexT endIndex, const FuncT& action, unsigned int threadCount = getSystemThreadCount());
void parallelize(BegIndexT beginIndex, EndIndexT endIndex, const FuncT& action, unsigned int taskCount = getSystemThreadCount());

/// Calls a function in parallel over an iterator range.
/// The iterators are automatically split, giving a separate start/past-the-end iterator range to each thread.
/// The given iterator range is automatically split, providing a separate start/past-the-end iterator sub-range to each task.
/// \note If using Emscripten this call will be synchronous, threads being unsupported with it for now.
/// \tparam IterT Type of the iterators.
/// \tparam FuncT Type of the action to be executed.
/// \param begin Begin iterator of the whole range. Must be lower than the end iterator.
/// \param end End iterator of the whole range. Must be greater than the begin iterator.
/// \param action Action to be performed by each thread, taking an iterator range as boundaries.
/// \param threadCount Amount of threads to start an instance on.
/// \param action Action to be performed in parallel, taking an iterator range as boundaries.
/// \param taskCount Amount of tasks to start.
template <typename IterT, typename FuncT, typename = typename std::iterator_traits<IterT>::iterator_category>
void parallelize(IterT begin, IterT end, const FuncT& action, unsigned int threadCount = getSystemThreadCount());
void parallelize(IterT begin, IterT end, const FuncT& action, unsigned int taskCount = getSystemThreadCount());

/// Calls a function in parallel over a collection.
/// The collection is automatically split, giving a separate start/past-the-end iterator range to each thread.
/// The given collection is automatically split, providing a separate start/past-the-end iterator sub-range to each task.
/// \note The container must either be a constant-size C array, or have public begin() & end() functions.
/// \note If using Emscripten this call will be synchronous, threads being unsupported with it for now.
/// \tparam ContainerT Type of the collection to iterate over.
/// \tparam FuncT Type of the action to be executed.
/// \param collection Collection to iterate over on multiple threads.
/// \param action Action to be performed by each thread, taking an iterator range as boundaries.
/// \param threadCount Amount of threads to start an instance on.
/// \param collection Collection to iterate over in parallel.
/// \param action Action to be performed in parallel, taking an iterator range as boundaries.
/// \param taskCount Amount of tasks to start.
template <typename ContainerT, typename FuncT, typename = decltype(std::begin(std::declval<ContainerT>()))>
void parallelize(ContainerT&& collection, FuncT&& action, unsigned int threadCount = getSystemThreadCount()) {
parallelize(std::begin(collection), std::end(collection), std::forward<FuncT>(action), threadCount);
void parallelize(ContainerT&& collection, FuncT&& action, unsigned int taskCount = getSystemThreadCount()) {
parallelize(std::begin(collection), std::end(collection), std::forward<FuncT>(action), taskCount);
}

} // namespace Threading
Expand Down
66 changes: 33 additions & 33 deletions include/RaZ/Utils/Threading.inl
Original file line number Diff line number Diff line change
Expand Up @@ -17,57 +17,57 @@ std::future<ResultT> launchAsync(FuncT&& action, Args&&... args) {
}

template <typename BegIndexT, typename EndIndexT, typename FuncT, typename>
void parallelize(BegIndexT beginIndex, EndIndexT endIndex, const FuncT& action, unsigned int threadCount) {
void parallelize(BegIndexT beginIndex, EndIndexT endIndex, const FuncT& action, unsigned int taskCount) {
static_assert(std::is_invocable_v<FuncT, IndexRange>, "Error: The given action must take an IndexRange as parameter");

if (threadCount == 0)
throw std::invalid_argument("[Threading] The number of threads cannot be 0.");
if (taskCount == 0)
throw std::invalid_argument("[Threading] The number of tasks cannot be 0.");

if (static_cast<std::ptrdiff_t>(beginIndex) >= static_cast<std::ptrdiff_t>(endIndex))
throw std::invalid_argument("[Threading] The given index range is invalid.");

#if !defined(RAZ_PLATFORM_EMSCRIPTEN)
ThreadPool& threadPool = getDefaultThreadPool();

const auto totalRangeCount = static_cast<std::size_t>(endIndex) - static_cast<std::size_t>(beginIndex);
const std::size_t maxThreadCount = std::min(static_cast<std::size_t>(threadCount), totalRangeCount);
const auto totalRangeCount = static_cast<std::size_t>(endIndex) - static_cast<std::size_t>(beginIndex);
const std::size_t maxTaskCount = std::min(static_cast<std::size_t>(taskCount), totalRangeCount);

const std::size_t perThreadRangeCount = totalRangeCount / maxThreadCount;
std::size_t remainderElementCount = totalRangeCount % maxThreadCount;
std::size_t threadBeginIndex = 0;
const std::size_t perTaskRangeCount = totalRangeCount / maxTaskCount;
std::size_t remainderElementCount = totalRangeCount % maxTaskCount;
std::size_t taskBeginIndex = 0;

std::vector<std::promise<void>> promises;
promises.resize(maxThreadCount);
promises.resize(maxTaskCount);

for (std::size_t threadIndex = 0; threadIndex < maxThreadCount; ++threadIndex) {
const std::size_t threadEndIndex = threadBeginIndex + perThreadRangeCount + (remainderElementCount > 0 ? 1 : 0);
for (std::size_t taskIndex = 0; taskIndex < maxTaskCount; ++taskIndex) {
const std::size_t taskEndIndex = taskBeginIndex + perTaskRangeCount + (remainderElementCount > 0 ? 1 : 0);

threadPool.addAction([&action, &promises, threadBeginIndex, threadEndIndex, threadIndex] () noexcept(std::is_nothrow_invocable_v<FuncT, IndexRange>) {
action(IndexRange{ threadBeginIndex, threadEndIndex });
promises[threadIndex].set_value();
threadPool.addTask([&action, &promises, taskBeginIndex, taskEndIndex, taskIndex] () noexcept(std::is_nothrow_invocable_v<FuncT, IndexRange>) {
action(IndexRange{ taskBeginIndex, taskEndIndex });
promises[taskIndex].set_value();
});

threadBeginIndex = threadEndIndex;
taskBeginIndex = taskEndIndex;

if (remainderElementCount > 0)
--remainderElementCount;
}

// Blocking here to wait for all threads to finish their action
// Blocking here waiting for all tasks to be finished
for (std::promise<void>& promise : promises)
promise.get_future().wait();
#else
static_cast<void>(threadCount);
static_cast<void>(taskCount);
action(IndexRange{ static_cast<std::size_t>(beginIndex), static_cast<std::size_t>(endIndex) });
#endif
}

template <typename IterT, typename FuncT, typename>
void parallelize(IterT begin, IterT end, const FuncT& action, unsigned int threadCount) {
void parallelize(IterT begin, IterT end, const FuncT& action, unsigned int taskCount) {
static_assert(std::is_invocable_v<FuncT, IterRange<IterT>>, "Error: The given action must take an IterRange as parameter");

if (threadCount == 0)
throw std::invalid_argument("[Threading] The number of threads cannot be 0.");
if (taskCount == 0)
throw std::invalid_argument("[Threading] The number of tasks cannot be 0.");

const auto totalRangeCount = std::distance(begin, end);

Expand All @@ -77,34 +77,34 @@ void parallelize(IterT begin, IterT end, const FuncT& action, unsigned int threa
#if !defined(RAZ_PLATFORM_EMSCRIPTEN)
ThreadPool& threadPool = getDefaultThreadPool();

const std::size_t maxThreadCount = std::min(static_cast<std::size_t>(threadCount), static_cast<std::size_t>(totalRangeCount));
const std::size_t maxTaskCount = std::min(static_cast<std::size_t>(taskCount), static_cast<std::size_t>(totalRangeCount));

const std::size_t perThreadRangeCount = static_cast<std::size_t>(totalRangeCount) / maxThreadCount;
std::size_t remainderElementCount = static_cast<std::size_t>(totalRangeCount) % maxThreadCount;
IterT threadBeginIter = begin;
const std::size_t perTaskRangeCount = static_cast<std::size_t>(totalRangeCount) / maxTaskCount;
std::size_t remainderElementCount = static_cast<std::size_t>(totalRangeCount) % maxTaskCount;
IterT taskBeginIter = begin;

std::vector<std::promise<void>> promises;
promises.resize(maxThreadCount);
promises.resize(maxTaskCount);

for (std::size_t threadIndex = 0; threadIndex < maxThreadCount; ++threadIndex) {
const IterT threadEndIter = std::next(threadBeginIter, static_cast<std::ptrdiff_t>(perThreadRangeCount + (remainderElementCount > 0 ? 1 : 0)));
for (std::size_t taskIndex = 0; taskIndex < maxTaskCount; ++taskIndex) {
const IterT taskEndIter = std::next(taskBeginIter, static_cast<std::ptrdiff_t>(perTaskRangeCount + (remainderElementCount > 0 ? 1 : 0)));

threadPool.addAction([&action, &promises, threadBeginIter, threadEndIter, threadIndex] () noexcept(std::is_nothrow_invocable_v<FuncT, IterRange<IterT>>) {
action(IterRange<IterT>(threadBeginIter, threadEndIter));
promises[threadIndex].set_value();
threadPool.addTask([&action, &promises, taskBeginIter, taskEndIter, taskIndex] () noexcept(std::is_nothrow_invocable_v<FuncT, IterRange<IterT>>) {
action(IterRange<IterT>(taskBeginIter, taskEndIter));
promises[taskIndex].set_value();
});

threadBeginIter = threadEndIter;
taskBeginIter = taskEndIter;

if (remainderElementCount > 0)
--remainderElementCount;
}

// Blocking here to wait for all threads to finish their action
// Blocking here waiting for all tasks to be finished
for (std::promise<void>& promise : promises)
promise.get_future().wait();
#else
static_cast<void>(threadCount);
static_cast<void>(taskCount);
action(IterRange<IterT>(begin, end));
#endif
}
Expand Down
20 changes: 10 additions & 10 deletions src/RaZ/Utils/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,32 @@ ThreadPool::ThreadPool(unsigned int threadCount) {
const std::string threadName = "Thread pool - #" + std::to_string(threadIndex + 1);
tracy::SetThreadName(threadName.c_str());

std::function<void()> action;
std::function<void()> task;

while (true) {
{
std::unique_lock<std::mutex> lock(m_actionsMutex);
m_condVar.wait(lock, [this] () { return (!m_actions.empty() || m_shouldStop); });
std::unique_lock<std::mutex> lock(m_tasksMutex);
m_condVar.wait(lock, [this] () { return (!m_tasks.empty() || m_shouldStop); });

if (m_shouldStop)
return;

action = std::move(m_actions.front());
m_actions.pop();
task = std::move(m_tasks.front());
m_tasks.pop();
}

action();
task();
}
});
}

Logger::debug("[ThreadPool] Initialized");
}

void ThreadPool::addAction(std::function<void()> action) {
void ThreadPool::addTask(std::function<void()> task) {
{
std::lock_guard<std::mutex> lock(m_actionsMutex);
m_actions.push(std::move(action));
const std::lock_guard<std::mutex> lock(m_tasksMutex);
m_tasks.push(std::move(task));
}

m_condVar.notify_one();
Expand All @@ -57,7 +57,7 @@ ThreadPool::~ThreadPool() {
Logger::debug("[ThreadPool] Destroying...");

{
std::lock_guard<std::mutex> lock(m_actionsMutex);
const std::lock_guard<std::mutex> lock(m_tasksMutex);
m_shouldStop = true;
}

Expand Down
28 changes: 14 additions & 14 deletions src/RaZ/Utils/Threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,28 @@ ThreadPool& getDefaultThreadPool() {
return threadPool;
}

void parallelize(const std::function<void()>& action, unsigned int threadCount) {
if (threadCount == 0)
throw std::invalid_argument("[Threading] The number of threads cannot be 0.");
void parallelize(const std::function<void()>& action, unsigned int taskCount) {
if (taskCount == 0)
throw std::invalid_argument("[Threading] The number of tasks cannot be 0.");

#if !defined(RAZ_PLATFORM_EMSCRIPTEN)
ThreadPool& threadPool = getDefaultThreadPool();

std::vector<std::promise<void>> promises;
promises.resize(threadCount);
promises.resize(taskCount);

for (unsigned int i = 0; i < threadCount; ++i) {
threadPool.addAction([&action, &promises, i] () {
for (unsigned int taskIndex = 0; taskIndex < taskCount; ++taskIndex) {
threadPool.addTask([&action, &promises, taskIndex] () {
action();
promises[i].set_value();
promises[taskIndex].set_value();
});
}

// Blocking here to wait for all threads to finish their action
// Blocking here waiting for all tasks to be finished
for (std::promise<void>& promise : promises)
promise.get_future().wait();
#else
for (unsigned int i = 0; i < threadCount; ++i)
for (unsigned int i = 0; i < taskCount; ++i)
action();
#endif
}
Expand All @@ -42,15 +42,15 @@ void parallelize(std::initializer_list<std::function<void()>> actions) {
std::vector<std::promise<void>> promises;
promises.resize(actions.size());

for (unsigned int i = 0; i < actions.size(); ++i) {
threadPool.addAction([&actions, &promises, i] () {
const std::function<void()>& action = *(actions.begin() + i);
for (unsigned int taskIndex = 0; taskIndex < actions.size(); ++taskIndex) {
threadPool.addTask([&actions, &promises, taskIndex] () {
const std::function<void()>& action = *(actions.begin() + taskIndex);
action();
promises[i].set_value();
promises[taskIndex].set_value();
});
}

// Blocking here to wait for all threads to finish their action
// Blocking here waiting for all tasks to be finished
for (std::promise<void>& promise : promises)
promise.get_future().wait();
#else
Expand Down
6 changes: 3 additions & 3 deletions tests/src/RaZ/Utils/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ TEST_CASE("ThreadPool basic", "[utils]") {
Raz::ThreadPool pool;
std::atomic<int> i = 0;

pool.addAction([&i] () noexcept { ++i; });
pool.addAction([&i] () noexcept { ++i; });
pool.addAction([&i] () noexcept { ++i; });
pool.addTask([&i] () noexcept { ++i; });
pool.addTask([&i] () noexcept { ++i; });
pool.addTask([&i] () noexcept { ++i; });
Raz::Threading::sleep(10); // Waiting a bit for the result to be available

CHECK(i == 3);
Expand Down

0 comments on commit 54a2eac

Please sign in to comment.