From 08024049c26871873eddc5ef9737089f96599fd2 Mon Sep 17 00:00:00 2001 From: Michael Fabian 'Xaymar' Dirks Date: Tue, 3 Mar 2020 01:49:26 +0100 Subject: [PATCH] util-threadpool: Implement global thread pool This thread pool can take large or small tasks and as such alleviates the burden of having a thread per source. Particularly for large setups, this drastically reduces the number of threads running in the background waiting for work. --- CMakeLists.txt | 2 ++ source/plugin.cpp | 11 ++++++ source/plugin.hpp | 4 +++ source/util-threadpool.cpp | 71 ++++++++++++++++++++++++++++++++++++++ source/util-threadpool.hpp | 46 ++++++++++++++++++++++++ 5 files changed, 134 insertions(+) create mode 100644 source/util-threadpool.cpp create mode 100644 source/util-threadpool.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 0965cf2c..8d7d23b8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -364,6 +364,8 @@ set(PROJECT_PRIVATE_SOURCE "${PROJECT_SOURCE_DIR}/source/utility.hpp" "${PROJECT_SOURCE_DIR}/source/utility.cpp" "${PROJECT_SOURCE_DIR}/source/util-event.hpp" + "${PROJECT_SOURCE_DIR}/source/util-threadpool.cpp" + "${PROJECT_SOURCE_DIR}/source/util-threadpool.hpp" # Graphics "${PROJECT_SOURCE_DIR}/source/gfx/gfx-source-texture.hpp" diff --git a/source/plugin.cpp b/source/plugin.cpp index 3058fb4b..cca9233b 100644 --- a/source/plugin.cpp +++ b/source/plugin.cpp @@ -35,10 +35,14 @@ #include "sources/source-mirror.hpp" #include "sources/source-shader.hpp" +static std::shared_ptr global_threadpool; + MODULE_EXPORT bool obs_module_load(void) try { LOG_INFO("Loading Version %s", STREAMFX_VERSION_STRING); + global_threadpool = std::make_shared(); + // Initialize Source Tracker obs::source_tracker::initialize(); @@ -90,6 +94,8 @@ try { // Finalize Source Tracker obs::source_tracker::finalize(); + + global_threadpool.reset(); } catch (...) { LOG_ERROR("Unexpected exception in function '%s'.", __FUNCTION_NAME__); } @@ -105,3 +111,8 @@ BOOL WINAPI DllMain(HINSTANCE, DWORD, LPVOID) return TRUE; } #endif + +std::shared_ptr get_global_threadpool() +{ + return global_threadpool; +} diff --git a/source/plugin.hpp b/source/plugin.hpp index 8e2579d7..3d55e03c 100644 --- a/source/plugin.hpp +++ b/source/plugin.hpp @@ -20,6 +20,7 @@ #pragma once #include "strings.hpp" #include "version.hpp" +#include "util-threadpool.hpp" // OBS #ifdef _MSC_VER @@ -46,3 +47,6 @@ #define __FUNCTION_NAME__ __func__ #endif #endif + +// Threadpool +std::shared_ptr get_global_threadpool(); diff --git a/source/util-threadpool.cpp b/source/util-threadpool.cpp new file mode 100644 index 00000000..a1660531 --- /dev/null +++ b/source/util-threadpool.cpp @@ -0,0 +1,71 @@ +/* + * Modern effects for a modern Streamer + * Copyright (C) 2020 Michael Fabian Dirks + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA +*/ + +#include "util-threadpool.hpp" + +// Most Tasks likely wait for IO, so we can use that time for other tasks. +#define CONCURRENCY_MULTIPLIER 2 + +util::threadpool::threadpool() : _workers(), _tasks(), _tasks_lock(), _tasks_cv(), _worker_stop(false) +{ + size_t concurrency = static_cast(std::thread::hardware_concurrency()) * CONCURRENCY_MULTIPLIER; + for (size_t n = 0; n < concurrency; n++) { + _workers.emplace_back(std::bind(&util::threadpool::work, this)); + } +} + +util::threadpool::~threadpool() +{ + _worker_stop = true; + _tasks_cv.notify_all(); + for (auto& thread : _workers) { + _tasks_cv.notify_all(); + if (thread.joinable()) + thread.join(); + } +} + +void util::threadpool::push(threadpool_function_t fn, std::shared_ptr data) +{ + std::unique_lock lock(_tasks_lock); + _tasks.emplace_back(fn, data); + _tasks_cv.notify_one(); +} + +void util::threadpool::work() +{ + std::pair> work; + + while (!_worker_stop) { + // Wait for more work, or immediately continue if there is still work to do. + { + std::unique_lock lock(_tasks_lock); + if (_tasks.size() == 0) + _tasks_cv.wait(lock, [this]() { return _worker_stop || _tasks.size() > 0; }); + if (_tasks.size() == 0) + continue; + work = _tasks.front(); + _tasks.pop_front(); + } + + // Execute work. + if (work.first) + work.first(work.second); + } +} diff --git a/source/util-threadpool.hpp b/source/util-threadpool.hpp new file mode 100644 index 00000000..bdb1ec9a --- /dev/null +++ b/source/util-threadpool.hpp @@ -0,0 +1,46 @@ +/* + * Modern effects for a modern Streamer + * Copyright (C) 2020 Michael Fabian Dirks + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA + */ + +#pragma once +#include +#include +#include +#include +#include + +namespace util { + typedef std::function)> threadpool_function_t; + + class threadpool { + std::list _workers; + std::atomic_bool _worker_stop; + std::list>> _tasks; + std::mutex _tasks_lock; + std::condition_variable _tasks_cv; + + public: + threadpool(); + ~threadpool(); + + void push(threadpool_function_t fn, std::shared_ptr data); + + private: + void work(); + }; +} // namespace util