diff --git a/include/RaZ/Utils/ThreadPool.hpp b/include/RaZ/Utils/ThreadPool.hpp index e881d0c4..43ec2b39 100644 --- a/include/RaZ/Utils/ThreadPool.hpp +++ b/include/RaZ/Utils/ThreadPool.hpp @@ -27,7 +27,7 @@ class ThreadPool { ThreadPool(); explicit ThreadPool(unsigned int threadCount); - void addAction(std::function action); + void addTask(std::function task); ~ThreadPool(); @@ -35,9 +35,9 @@ class ThreadPool { std::vector m_threads {}; bool m_shouldStop = false; - std::mutex m_actionsMutex {}; + std::mutex m_tasksMutex {}; std::condition_variable m_condVar {}; - std::queue> m_actions {}; + std::queue> m_tasks {}; }; } // namespace Raz diff --git a/include/RaZ/Utils/Threading.hpp b/include/RaZ/Utils/Threading.hpp index 29f58920..6de38916 100644 --- a/include/RaZ/Utils/Threading.hpp +++ b/include/RaZ/Utils/Threading.hpp @@ -71,52 +71,52 @@ template & action, unsigned int threadCount = getSystemThreadCount()); +/// \param action Action to be performed in parallel. +/// \param taskCount Amount of tasks to start. +void parallelize(const std::function& action, unsigned int taskCount = getSystemThreadCount()); -/// Calls the given functions in parallel, each on its own thread. +/// Calls the given functions in parallel. /// \note If using Emscripten this call will be synchronous, threads being unsupported with it for now. /// \param actions Actions to be performed in parallel. void parallelize(std::initializer_list> actions); /// Calls a function in parallel over an index range. -/// The indices are automatically split, giving a separate start/past-the-end index range to each thread. +/// The given index range is automatically split, providing a separate start/past-the-end index sub-range to each task. /// \note If using Emscripten this call will be synchronous, threads being unsupported with it for now. /// \tparam BegIndexT Type of the begin index. /// \tparam EndIndexT Type of the end index. /// \tparam FuncT Type of the action to be executed. /// \param beginIndex Starting index of the whole range. Must be lower than the end index. /// \param endIndex Past-the-last index of the whole range. Must be greater than the begin index. -/// \param action Action to be performed by each thread, taking an index range as boundaries. -/// \param threadCount Amount of threads to start an instance on. +/// \param action Action to be performed in parallel, taking an index range as boundaries. +/// \param taskCount Amount of tasks to start. template && std::is_integral_v>> -void parallelize(BegIndexT beginIndex, EndIndexT endIndex, const FuncT& action, unsigned int threadCount = getSystemThreadCount()); +void parallelize(BegIndexT beginIndex, EndIndexT endIndex, const FuncT& action, unsigned int taskCount = getSystemThreadCount()); /// Calls a function in parallel over an iterator range. -/// The iterators are automatically split, giving a separate start/past-the-end iterator range to each thread. +/// The given iterator range is automatically split, providing a separate start/past-the-end iterator sub-range to each task. /// \note If using Emscripten this call will be synchronous, threads being unsupported with it for now. /// \tparam IterT Type of the iterators. /// \tparam FuncT Type of the action to be executed. /// \param begin Begin iterator of the whole range. Must be lower than the end iterator. /// \param end End iterator of the whole range. Must be greater than the begin iterator. -/// \param action Action to be performed by each thread, taking an iterator range as boundaries. -/// \param threadCount Amount of threads to start an instance on. +/// \param action Action to be performed in parallel, taking an iterator range as boundaries. +/// \param taskCount Amount of tasks to start. template ::iterator_category> -void parallelize(IterT begin, IterT end, const FuncT& action, unsigned int threadCount = getSystemThreadCount()); +void parallelize(IterT begin, IterT end, const FuncT& action, unsigned int taskCount = getSystemThreadCount()); /// Calls a function in parallel over a collection. -/// The collection is automatically split, giving a separate start/past-the-end iterator range to each thread. +/// The given collection is automatically split, providing a separate start/past-the-end iterator sub-range to each task. /// \note The container must either be a constant-size C array, or have public begin() & end() functions. /// \note If using Emscripten this call will be synchronous, threads being unsupported with it for now. /// \tparam ContainerT Type of the collection to iterate over. /// \tparam FuncT Type of the action to be executed. -/// \param collection Collection to iterate over on multiple threads. -/// \param action Action to be performed by each thread, taking an iterator range as boundaries. -/// \param threadCount Amount of threads to start an instance on. +/// \param collection Collection to iterate over in parallel. +/// \param action Action to be performed in parallel, taking an iterator range as boundaries. +/// \param taskCount Amount of tasks to start. template ()))> -void parallelize(ContainerT&& collection, FuncT&& action, unsigned int threadCount = getSystemThreadCount()) { - parallelize(std::begin(collection), std::end(collection), std::forward(action), threadCount); +void parallelize(ContainerT&& collection, FuncT&& action, unsigned int taskCount = getSystemThreadCount()) { + parallelize(std::begin(collection), std::end(collection), std::forward(action), taskCount); } } // namespace Threading diff --git a/include/RaZ/Utils/Threading.inl b/include/RaZ/Utils/Threading.inl index 4f553573..34a44a57 100644 --- a/include/RaZ/Utils/Threading.inl +++ b/include/RaZ/Utils/Threading.inl @@ -17,11 +17,11 @@ std::future launchAsync(FuncT&& action, Args&&... args) { } template -void parallelize(BegIndexT beginIndex, EndIndexT endIndex, const FuncT& action, unsigned int threadCount) { +void parallelize(BegIndexT beginIndex, EndIndexT endIndex, const FuncT& action, unsigned int taskCount) { static_assert(std::is_invocable_v, "Error: The given action must take an IndexRange as parameter"); - if (threadCount == 0) - throw std::invalid_argument("[Threading] The number of threads cannot be 0."); + if (taskCount == 0) + throw std::invalid_argument("[Threading] The number of tasks cannot be 0."); if (static_cast(beginIndex) >= static_cast(endIndex)) throw std::invalid_argument("[Threading] The given index range is invalid."); @@ -29,45 +29,45 @@ void parallelize(BegIndexT beginIndex, EndIndexT endIndex, const FuncT& action, #if !defined(RAZ_PLATFORM_EMSCRIPTEN) ThreadPool& threadPool = getDefaultThreadPool(); - const auto totalRangeCount = static_cast(endIndex) - static_cast(beginIndex); - const std::size_t maxThreadCount = std::min(static_cast(threadCount), totalRangeCount); + const auto totalRangeCount = static_cast(endIndex) - static_cast(beginIndex); + const std::size_t maxTaskCount = std::min(static_cast(taskCount), totalRangeCount); - const std::size_t perThreadRangeCount = totalRangeCount / maxThreadCount; - std::size_t remainderElementCount = totalRangeCount % maxThreadCount; - std::size_t threadBeginIndex = 0; + const std::size_t perTaskRangeCount = totalRangeCount / maxTaskCount; + std::size_t remainderElementCount = totalRangeCount % maxTaskCount; + std::size_t taskBeginIndex = 0; std::vector> promises; - promises.resize(maxThreadCount); + promises.resize(maxTaskCount); - for (std::size_t threadIndex = 0; threadIndex < maxThreadCount; ++threadIndex) { - const std::size_t threadEndIndex = threadBeginIndex + perThreadRangeCount + (remainderElementCount > 0 ? 1 : 0); + for (std::size_t taskIndex = 0; taskIndex < maxTaskCount; ++taskIndex) { + const std::size_t taskEndIndex = taskBeginIndex + perTaskRangeCount + (remainderElementCount > 0 ? 1 : 0); - threadPool.addAction([&action, &promises, threadBeginIndex, threadEndIndex, threadIndex] () noexcept(std::is_nothrow_invocable_v) { - action(IndexRange{ threadBeginIndex, threadEndIndex }); - promises[threadIndex].set_value(); + threadPool.addTask([&action, &promises, taskBeginIndex, taskEndIndex, taskIndex] () noexcept(std::is_nothrow_invocable_v) { + action(IndexRange{ taskBeginIndex, taskEndIndex }); + promises[taskIndex].set_value(); }); - threadBeginIndex = threadEndIndex; + taskBeginIndex = taskEndIndex; if (remainderElementCount > 0) --remainderElementCount; } - // Blocking here to wait for all threads to finish their action + // Blocking here waiting for all tasks to be finished for (std::promise& promise : promises) promise.get_future().wait(); #else - static_cast(threadCount); + static_cast(taskCount); action(IndexRange{ static_cast(beginIndex), static_cast(endIndex) }); #endif } template -void parallelize(IterT begin, IterT end, const FuncT& action, unsigned int threadCount) { +void parallelize(IterT begin, IterT end, const FuncT& action, unsigned int taskCount) { static_assert(std::is_invocable_v>, "Error: The given action must take an IterRange as parameter"); - if (threadCount == 0) - throw std::invalid_argument("[Threading] The number of threads cannot be 0."); + if (taskCount == 0) + throw std::invalid_argument("[Threading] The number of tasks cannot be 0."); const auto totalRangeCount = std::distance(begin, end); @@ -77,34 +77,34 @@ void parallelize(IterT begin, IterT end, const FuncT& action, unsigned int threa #if !defined(RAZ_PLATFORM_EMSCRIPTEN) ThreadPool& threadPool = getDefaultThreadPool(); - const std::size_t maxThreadCount = std::min(static_cast(threadCount), static_cast(totalRangeCount)); + const std::size_t maxTaskCount = std::min(static_cast(taskCount), static_cast(totalRangeCount)); - const std::size_t perThreadRangeCount = static_cast(totalRangeCount) / maxThreadCount; - std::size_t remainderElementCount = static_cast(totalRangeCount) % maxThreadCount; - IterT threadBeginIter = begin; + const std::size_t perTaskRangeCount = static_cast(totalRangeCount) / maxTaskCount; + std::size_t remainderElementCount = static_cast(totalRangeCount) % maxTaskCount; + IterT taskBeginIter = begin; std::vector> promises; - promises.resize(maxThreadCount); + promises.resize(maxTaskCount); - for (std::size_t threadIndex = 0; threadIndex < maxThreadCount; ++threadIndex) { - const IterT threadEndIter = std::next(threadBeginIter, static_cast(perThreadRangeCount + (remainderElementCount > 0 ? 1 : 0))); + for (std::size_t taskIndex = 0; taskIndex < maxTaskCount; ++taskIndex) { + const IterT taskEndIter = std::next(taskBeginIter, static_cast(perTaskRangeCount + (remainderElementCount > 0 ? 1 : 0))); - threadPool.addAction([&action, &promises, threadBeginIter, threadEndIter, threadIndex] () noexcept(std::is_nothrow_invocable_v>) { - action(IterRange(threadBeginIter, threadEndIter)); - promises[threadIndex].set_value(); + threadPool.addTask([&action, &promises, taskBeginIter, taskEndIter, taskIndex] () noexcept(std::is_nothrow_invocable_v>) { + action(IterRange(taskBeginIter, taskEndIter)); + promises[taskIndex].set_value(); }); - threadBeginIter = threadEndIter; + taskBeginIter = taskEndIter; if (remainderElementCount > 0) --remainderElementCount; } - // Blocking here to wait for all threads to finish their action + // Blocking here waiting for all tasks to be finished for (std::promise& promise : promises) promise.get_future().wait(); #else - static_cast(threadCount); + static_cast(taskCount); action(IterRange(begin, end)); #endif } diff --git a/src/RaZ/Utils/ThreadPool.cpp b/src/RaZ/Utils/ThreadPool.cpp index eaa47eb1..5c7a9f54 100644 --- a/src/RaZ/Utils/ThreadPool.cpp +++ b/src/RaZ/Utils/ThreadPool.cpp @@ -20,21 +20,21 @@ ThreadPool::ThreadPool(unsigned int threadCount) { const std::string threadName = "Thread pool - #" + std::to_string(threadIndex + 1); tracy::SetThreadName(threadName.c_str()); - std::function action; + std::function task; while (true) { { - std::unique_lock lock(m_actionsMutex); - m_condVar.wait(lock, [this] () { return (!m_actions.empty() || m_shouldStop); }); + std::unique_lock lock(m_tasksMutex); + m_condVar.wait(lock, [this] () { return (!m_tasks.empty() || m_shouldStop); }); if (m_shouldStop) return; - action = std::move(m_actions.front()); - m_actions.pop(); + task = std::move(m_tasks.front()); + m_tasks.pop(); } - action(); + task(); } }); } @@ -42,10 +42,10 @@ ThreadPool::ThreadPool(unsigned int threadCount) { Logger::debug("[ThreadPool] Initialized"); } -void ThreadPool::addAction(std::function action) { +void ThreadPool::addTask(std::function task) { { - std::lock_guard lock(m_actionsMutex); - m_actions.push(std::move(action)); + const std::lock_guard lock(m_tasksMutex); + m_tasks.push(std::move(task)); } m_condVar.notify_one(); @@ -57,7 +57,7 @@ ThreadPool::~ThreadPool() { Logger::debug("[ThreadPool] Destroying..."); { - std::lock_guard lock(m_actionsMutex); + const std::lock_guard lock(m_tasksMutex); m_shouldStop = true; } diff --git a/src/RaZ/Utils/Threading.cpp b/src/RaZ/Utils/Threading.cpp index 16b1288f..7427f2ea 100644 --- a/src/RaZ/Utils/Threading.cpp +++ b/src/RaZ/Utils/Threading.cpp @@ -9,28 +9,28 @@ ThreadPool& getDefaultThreadPool() { return threadPool; } -void parallelize(const std::function& action, unsigned int threadCount) { - if (threadCount == 0) - throw std::invalid_argument("[Threading] The number of threads cannot be 0."); +void parallelize(const std::function& action, unsigned int taskCount) { + if (taskCount == 0) + throw std::invalid_argument("[Threading] The number of tasks cannot be 0."); #if !defined(RAZ_PLATFORM_EMSCRIPTEN) ThreadPool& threadPool = getDefaultThreadPool(); std::vector> promises; - promises.resize(threadCount); + promises.resize(taskCount); - for (unsigned int i = 0; i < threadCount; ++i) { - threadPool.addAction([&action, &promises, i] () { + for (unsigned int taskIndex = 0; taskIndex < taskCount; ++taskIndex) { + threadPool.addTask([&action, &promises, taskIndex] () { action(); - promises[i].set_value(); + promises[taskIndex].set_value(); }); } - // Blocking here to wait for all threads to finish their action + // Blocking here waiting for all tasks to be finished for (std::promise& promise : promises) promise.get_future().wait(); #else - for (unsigned int i = 0; i < threadCount; ++i) + for (unsigned int i = 0; i < taskCount; ++i) action(); #endif } @@ -42,15 +42,15 @@ void parallelize(std::initializer_list> actions) { std::vector> promises; promises.resize(actions.size()); - for (unsigned int i = 0; i < actions.size(); ++i) { - threadPool.addAction([&actions, &promises, i] () { - const std::function& action = *(actions.begin() + i); + for (unsigned int taskIndex = 0; taskIndex < actions.size(); ++taskIndex) { + threadPool.addTask([&actions, &promises, taskIndex] () { + const std::function& action = *(actions.begin() + taskIndex); action(); - promises[i].set_value(); + promises[taskIndex].set_value(); }); } - // Blocking here to wait for all threads to finish their action + // Blocking here waiting for all tasks to be finished for (std::promise& promise : promises) promise.get_future().wait(); #else diff --git a/tests/src/RaZ/Utils/ThreadPool.cpp b/tests/src/RaZ/Utils/ThreadPool.cpp index d4bde338..8b35b8a1 100644 --- a/tests/src/RaZ/Utils/ThreadPool.cpp +++ b/tests/src/RaZ/Utils/ThreadPool.cpp @@ -9,9 +9,9 @@ TEST_CASE("ThreadPool basic", "[utils]") { Raz::ThreadPool pool; std::atomic i = 0; - pool.addAction([&i] () noexcept { ++i; }); - pool.addAction([&i] () noexcept { ++i; }); - pool.addAction([&i] () noexcept { ++i; }); + pool.addTask([&i] () noexcept { ++i; }); + pool.addTask([&i] () noexcept { ++i; }); + pool.addTask([&i] () noexcept { ++i; }); Raz::Threading::sleep(10); // Waiting a bit for the result to be available CHECK(i == 3);