diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e5e7e1..3201107 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -125,6 +125,7 @@ set(PROJECT_PUBLIC "include/itask.hpp" "include/waitable.hpp" "include/permissions.hpp" + "include/threadpool.hpp" ) set(PROJECT_PUBLIC_GENERATED @@ -138,6 +139,7 @@ set(PROJECT_DATA # Private (only compiled/used locally) set(PROJECT_PRIVATE + "source/threadpool.cpp" ) # Libraries diff --git a/include/threadpool.hpp b/include/threadpool.hpp new file mode 100644 index 0000000..489a17d --- /dev/null +++ b/include/threadpool.hpp @@ -0,0 +1,72 @@ +/* + * Low Latency IPC Library for high-speed traffic + * Copyright (C) 2017-2019 Michael Fabian Dirks + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA + */ + +#pragma once +#include +#include +#include +#include +#include +#include +#include "error.hpp" + +namespace datapath { + namespace threadpool { + typedef uint64_t affinity_t; + + constexpr affinity_t default_mask = std::numeric_limits::max(); + + struct task { + std::function function; + affinity_t mask = default_mask; + }; + + class pool { + struct worker { + affinity_t affinity; + bool should_stop = false; + + std::thread thread; + + std::mutex mutex; + std::queue> queue; + std::condition_variable signal; + + worker(affinity_t affinity); + ~worker(); + + void runner(); + + void clear(); + + void push(std::shared_ptr task); + }; + + std::map> workers; + + public: + pool(); + ~pool(); + + bool push(std::shared_ptr task); + + void clear(affinity_t mask = default_mask); + }; + } // namespace threadpool +} // namespace datapath diff --git a/source/threadpool.cpp b/source/threadpool.cpp new file mode 100644 index 0000000..17c156f --- /dev/null +++ b/source/threadpool.cpp @@ -0,0 +1,156 @@ +/* + * Low Latency IPC Library for high-speed traffic + * Copyright (C) 2017-2019 Michael Fabian Dirks + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA + */ + +#include "threadpool.hpp" +#ifdef _WIN32 +#include +#endif + +datapath::threadpool::pool::worker::worker(affinity_t affinity) : affinity(affinity), should_stop(false) +{ + std::unique_lock qlock(this->mutex); + this->thread = std::thread(&datapath::threadpool::pool::worker::runner, this); +} + +datapath::threadpool::pool::worker::~worker() +{ + this->should_stop = true; + this->signal.notify_all(); + if (this->thread.joinable()) { + this->thread.join(); + } +} + +void datapath::threadpool::pool::worker::runner() +{ + std::shared_ptr my_task; + + // Assign affinity +#ifdef _WIN32 + SetThreadAffinityMask(reinterpret_cast(this->thread.native_handle()), this->affinity); +#else +#endif + + while (!this->should_stop) { + { // Grab any available work. + std::unique_lock lock(this->mutex); + if (this->queue.size() > 0) + my_task = this->queue.front(); + } + + if (my_task) // Execute work if we have any. + if (my_task->function) + my_task->function(); + + { // Remove it from the queue and wait. + std::unique_lock slock(this->mutex); + if (my_task) { + this->queue.pop(); + my_task.reset(); + } + if (this->queue.size() == 0) { + this->signal.wait(slock, + [this]() { return (this->should_stop) || (this->queue.size() > 0); }); + } + } + } +} + +void datapath::threadpool::pool::worker::clear() +{ + std::unique_lock slock(this->mutex); + while (this->queue.size() > 0) + this->queue.pop(); +} + +void datapath::threadpool::pool::worker::push(std::shared_ptr task) +{ + { + std::unique_lock slock(this->mutex); + this->queue.push(task); + } + this->signal.notify_all(); +} + +datapath::threadpool::pool::pool() +{ + // Spawn x number of threads for working. + uint64_t num_hw_concurrency = std::thread::hardware_concurrency(); + for (uint64_t idx = 0; idx < num_hw_concurrency; idx++) { + auto worker = std::make_shared(1 << idx); + this->workers.insert({idx, worker}); + } +} + +datapath::threadpool::pool::~pool() +{ + this->workers.clear(); +} + +bool datapath::threadpool::pool::push(std::shared_ptr task) +{ + // Early-Exit tests. + /// Check for null or invalid tasks. + if (!task) { + throw std::invalid_argument("task must not be nullptr"); + } + if (!task->function) { + throw std::invalid_argument("task->function must not be nullptr"); + } + /// Check for invalid affinity masks. + if ((task->mask & (this->workers.size() - 1)) == 0) { + throw std::invalid_argument("mask does not fit any thread"); + } + + affinity_t lowest_id; + size_t lowest_count = std::numeric_limits::max(); + for (auto kv : workers) { + if ((kv.second->affinity & task->mask) == 0) { + continue; + } + + std::unique_lock lock(kv.second->mutex); + if (kv.second->queue.size() < lowest_count) { + lowest_id = kv.first; + lowest_count = kv.second->queue.size(); + } + } + if (lowest_count == std::numeric_limits::max()) { + return false; + } + + this->workers[lowest_id]->push(task); + return true; +} + +void datapath::threadpool::pool::clear(affinity_t mask) +{ + // Early-Exit tests. + if ((mask & (this->workers.size() - 1)) == 0) { + throw std::invalid_argument("mask does not fit any thread"); + } + + for (auto kv : workers) { + if ((kv.second->affinity & mask) == 0) { + continue; + } + + kv.second->clear(); + } +}