diff --git a/source/filters/filter-autoframing.cpp b/source/filters/filter-autoframing.cpp index c9151ac3..ab54daf6 100644 --- a/source/filters/filter-autoframing.cpp +++ b/source/filters/filter-autoframing.cpp @@ -897,7 +897,7 @@ void streamfx::filter::autoframing::autoframing_instance::switch_provider(tracki std::bind(&autoframing_instance::task_switch_provider, this, std::placeholders::_1), spd); } -void streamfx::filter::autoframing::autoframing_instance::task_switch_provider(util::threadpool_data_t data) +void streamfx::filter::autoframing::autoframing_instance::task_switch_provider(util::threadpool::task_data_t data) { std::shared_ptr spd = std::static_pointer_cast(data); diff --git a/source/filters/filter-autoframing.hpp b/source/filters/filter-autoframing.hpp index 29ee24ff..a4aa5082 100644 --- a/source/filters/filter-autoframing.hpp +++ b/source/filters/filter-autoframing.hpp @@ -148,7 +148,7 @@ namespace streamfx::filter::autoframing { void tracking_tick(float seconds); void switch_provider(tracking_provider provider); - void task_switch_provider(util::threadpool_data_t data); + void task_switch_provider(util::threadpool::task_data_t data); #ifdef ENABLE_FILTER_AUTOFRAMING_NVIDIA void nvar_facedetection_load(); diff --git a/source/filters/filter-denoising.cpp b/source/filters/filter-denoising.cpp index 7b38ccee..19b64f50 100644 --- a/source/filters/filter-denoising.cpp +++ b/source/filters/filter-denoising.cpp @@ -409,7 +409,7 @@ void streamfx::filter::denoising::denoising_instance::switch_provider(denoising_ std::bind(&denoising_instance::task_switch_provider, this, std::placeholders::_1), spd); } -void streamfx::filter::denoising::denoising_instance::task_switch_provider(util::threadpool_data_t data) +void streamfx::filter::denoising::denoising_instance::task_switch_provider(util::threadpool::task_data_t data) { std::shared_ptr spd = std::static_pointer_cast(data); diff --git a/source/filters/filter-denoising.hpp b/source/filters/filter-denoising.hpp index a524e6a9..72f67bf6 100644 --- a/source/filters/filter-denoising.hpp +++ b/source/filters/filter-denoising.hpp @@ -85,7 +85,7 @@ namespace streamfx::filter::denoising { private: void switch_provider(denoising_provider provider); - void task_switch_provider(util::threadpool_data_t data); + void task_switch_provider(util::threadpool::task_data_t data); #ifdef ENABLE_FILTER_DENOISING_NVIDIA void nvvfx_denoising_load(); diff --git a/source/filters/filter-upscaling.cpp b/source/filters/filter-upscaling.cpp index cc58f564..ab69ed4f 100644 --- a/source/filters/filter-upscaling.cpp +++ b/source/filters/filter-upscaling.cpp @@ -394,7 +394,7 @@ void streamfx::filter::upscaling::upscaling_instance::switch_provider(upscaling_ std::bind(&upscaling_instance::task_switch_provider, this, std::placeholders::_1), spd); } -void streamfx::filter::upscaling::upscaling_instance::task_switch_provider(util::threadpool_data_t data) +void streamfx::filter::upscaling::upscaling_instance::task_switch_provider(util::threadpool::task_data_t data) { std::shared_ptr spd = std::static_pointer_cast(data); diff --git a/source/filters/filter-upscaling.hpp b/source/filters/filter-upscaling.hpp index 35932bde..2491f6e6 100644 --- a/source/filters/filter-upscaling.hpp +++ b/source/filters/filter-upscaling.hpp @@ -86,7 +86,7 @@ namespace streamfx::filter::upscaling { private: void switch_provider(upscaling_provider provider); - void task_switch_provider(util::threadpool_data_t data); + void task_switch_provider(util::threadpool::task_data_t data); #ifdef ENABLE_FILTER_UPSCALING_NVIDIA void nvvfxsr_load(); diff --git a/source/filters/filter-virtual-greenscreen.cpp b/source/filters/filter-virtual-greenscreen.cpp index 62fb96f6..7d664de4 100644 --- a/source/filters/filter-virtual-greenscreen.cpp +++ b/source/filters/filter-virtual-greenscreen.cpp @@ -406,7 +406,7 @@ void streamfx::filter::virtual_greenscreen::virtual_greenscreen_instance::switch } void streamfx::filter::virtual_greenscreen::virtual_greenscreen_instance::task_switch_provider( - util::threadpool_data_t data) + util::threadpool::task_data_t data) { std::shared_ptr spd = std::static_pointer_cast(data); diff --git a/source/filters/filter-virtual-greenscreen.hpp b/source/filters/filter-virtual-greenscreen.hpp index c170add7..267e1b20 100644 --- a/source/filters/filter-virtual-greenscreen.hpp +++ b/source/filters/filter-virtual-greenscreen.hpp @@ -86,7 +86,7 @@ namespace streamfx::filter::virtual_greenscreen { private: void switch_provider(virtual_greenscreen_provider provider); - void task_switch_provider(util::threadpool_data_t data); + void task_switch_provider(util::threadpool::task_data_t data); #ifdef ENABLE_FILTER_VIRTUAL_GREENSCREEN_NVIDIA void nvvfxgs_load(); diff --git a/source/plugin.cpp b/source/plugin.cpp index 25dff0c0..2bcf0788 100644 --- a/source/plugin.cpp +++ b/source/plugin.cpp @@ -94,10 +94,10 @@ #include #include "warning-enable.hpp" -static std::shared_ptr _threadpool; -static std::shared_ptr _gs_fstri_vb; -static std::shared_ptr _streamfx_gfx_opengl; -static std::shared_ptr _source_tracker; +static std::shared_ptr _threadpool; +static std::shared_ptr _gs_fstri_vb; +static std::shared_ptr _streamfx_gfx_opengl; +static std::shared_ptr _source_tracker; MODULE_EXPORT bool obs_module_load(void) { @@ -108,7 +108,7 @@ MODULE_EXPORT bool obs_module_load(void) streamfx::configuration::initialize(); // Initialize global Thread Pool. - _threadpool = std::make_shared(); + _threadpool = std::make_shared(); // Initialize Source Tracker _source_tracker = streamfx::obs::source_tracker::get(); @@ -338,7 +338,7 @@ MODULE_EXPORT void obs_module_unload(void) } } -std::shared_ptr streamfx::threadpool() +std::shared_ptr streamfx::threadpool() { return _threadpool; } diff --git a/source/plugin.hpp b/source/plugin.hpp index 3475bdb4..c2f8a7a6 100644 --- a/source/plugin.hpp +++ b/source/plugin.hpp @@ -22,7 +22,7 @@ namespace streamfx { // Threadpool - std::shared_ptr threadpool(); + std::shared_ptr threadpool(); void gs_draw_fullscreen_tri(); diff --git a/source/updater.cpp b/source/updater.cpp index 98b5924b..1144bbc8 100644 --- a/source/updater.cpp +++ b/source/updater.cpp @@ -228,7 +228,7 @@ streamfx::version_info::operator std::string() } } -void streamfx::updater::task(streamfx::util::threadpool_data_t) +void streamfx::updater::task(streamfx::util::threadpool::task_data_t) { try { auto query_fn = [](std::vector& buffer) { diff --git a/source/updater.hpp b/source/updater.hpp index 7b8a8fdc..7e532786 100644 --- a/source/updater.hpp +++ b/source/updater.hpp @@ -82,7 +82,7 @@ namespace streamfx { bool _dirty; private: - void task(streamfx::util::threadpool_data_t); + void task(streamfx::util::threadpool::task_data_t); bool can_check(); diff --git a/source/util/util-threadpool.cpp b/source/util/util-threadpool.cpp index 5e719a42..cb67add3 100644 --- a/source/util/util-threadpool.cpp +++ b/source/util/util-threadpool.cpp @@ -1,21 +1,18 @@ -/* - * Modern effects for a modern Streamer - * Copyright (C) 2020 Michael Fabian Dirks - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -*/ +// Copyright (C) 2020-2022 Michael Fabian Dirks +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA #include "util-threadpool.hpp" #include "common.hpp" @@ -25,6 +22,14 @@ #include #include "warning-enable.hpp" +#include "warning-disable.hpp" +#if defined(D_PLATFORM_WINDOWS) +#include +#elif defined(D_PLATFORM_LINUX) +#include +#endif +#include "warning-enable.hpp" + #ifdef _DEBUG #define ST_PREFIX "<%s> " #define D_LOG_ERROR(x, ...) P_LOG_ERROR(ST_PREFIX##x, __FUNCTION_SIG__, __VA_ARGS__) @@ -39,125 +44,214 @@ #define D_LOG_DEBUG(...) P_LOG_DEBUG(ST_PREFIX __VA_ARGS__) #endif -// Most Tasks likely wait for IO, so we can use that time for other tasks. -#define ST_CONCURRENCY_MULTIPLIER 2 - -streamfx::util::threadpool::threadpool() - : _workers(), _worker_stop(false), _worker_idx(0), _tasks(), _tasks_lock(), _tasks_cv() -{ - std::size_t concurrency = static_cast(std::thread::hardware_concurrency() * ST_CONCURRENCY_MULTIPLIER); - for (std::size_t n = 0; n < concurrency; n++) { - _workers.emplace_back(std::bind(&streamfx::util::threadpool::work, this)); - } -} - -streamfx::util::threadpool::~threadpool() -{ - _worker_stop = true; - _tasks_cv.notify_all(); - for (auto& thread : _workers) { - _tasks_cv.notify_all(); - if (thread.joinable()) { - thread.join(); - } - } -} - -std::shared_ptr<::streamfx::util::threadpool::task> streamfx::util::threadpool::push(threadpool_callback_t fn, - threadpool_data_t data) -{ - auto task = std::make_shared(fn, data); - - // Append the task to the queue. - std::unique_lock lock(_tasks_lock); - _tasks.emplace_back(task); - _tasks_cv.notify_one(); - - return task; -} - -void streamfx::util::threadpool::pop(std::shared_ptr<::streamfx::util::threadpool::task> work) -{ - if (work) { - { - std::unique_lock lock(work->_mutex); - work->_is_dead = true; - } - work->_is_complete.notify_all(); - } -} - -void streamfx::util::threadpool::work() -{ - std::shared_ptr local_work{}; - uint32_t local_number = _worker_idx.fetch_add(1); - - while (!_worker_stop) { - // Wait for more work, or immediately continue if there is still work to do. - { - // Lock the tasks mutex to check for work. - std::unique_lock lock(_tasks_lock); - - // If there are currently no tasks queued, wait on the condition variable. - // This temporarily unlocks the mutex until it is woken up. - if (_tasks.size() == 0) { - _tasks_cv.wait(lock, [this]() { return _worker_stop || _tasks.size() > 0; }); - } - - // If there is either no tasks or we were asked to stop, skip everything. - if (_worker_stop || (_tasks.size() == 0)) { - continue; - } - - // Grab the latest task and immediately remove it from the queue. - local_work = _tasks.front(); - _tasks.pop_front(); - } - - // If the task was killed, skip everything again. - if (local_work->_is_dead.load()) { - continue; - } - - // Try to execute work, but don't crash on catchable exceptions. - if (local_work->_callback) { - try { - local_work->_callback(local_work->_data); - } catch (std::exception const& ex) { - D_LOG_WARNING("Worker %" PRIx32 " caught exception from task (%" PRIxPTR ", %" PRIxPTR - ") with message: %s", - local_number, reinterpret_cast(local_work->_callback.target()), - reinterpret_cast(local_work->_data.get()), ex.what()); - } catch (...) { - D_LOG_WARNING("Worker %" PRIx32 " caught exception of unknown type from task (%" PRIxPTR ", %" PRIxPTR - ").", - local_number, reinterpret_cast(local_work->_callback.target()), - reinterpret_cast(local_work->_data.get())); - } - { - std::unique_lock lock(local_work->_mutex); - local_work->_is_dead.store(true); - } - local_work->_is_complete.notify_all(); - } - - // Remove our reference to the work unit. - local_work.reset(); - } - - _worker_idx.fetch_sub(1); -} - -streamfx::util::threadpool::task::task() = default; - -streamfx::util::threadpool::task::task(threadpool_callback_t fn, threadpool_data_t dt) - : _mutex(), _is_complete(), _is_dead(false), _callback(fn), _data(dt) +streamfx::util::threadpool::task::task(task_callback_t callback, task_data_t data) + : _callback(callback), _data(data), _lock(), _status_changed(), _cancelled(false), _completed(false), _failed(false) {} +streamfx::util::threadpool::task::~task() {} + +void streamfx::util::threadpool::task::run() +{ + std::lock_guard lg(_lock); + if (!_cancelled) { + try { + _callback(_data); + } catch (const std::exception& ex) { + D_LOG_ERROR("Unhandled exception in Task: %s.", ex.what()); + _failed = false; + } catch (...) { + D_LOG_ERROR("Unhandled exception in Task.", nullptr); + _failed = true; + } + } + _completed = true; + _status_changed.notify_all(); +} + +void streamfx::util::threadpool::task::cancel() +{ + std::lock_guard lg(_lock); + _cancelled = true; + _completed = true; + _status_changed.notify_all(); +} + +bool streamfx::util::threadpool::task::is_cancelled() +{ + return _cancelled; +} + +bool streamfx::util::threadpool::task::is_completed() +{ + return _completed; +} + +bool streamfx::util::threadpool::task::has_failed() +{ + return _failed; +} + +void streamfx::util::threadpool::task::wait() +{ + std::unique_lock ul(_lock); + if (!_cancelled && !_completed && !_failed) { + _status_changed.wait(ul, + [this]() { return this->is_completed() || this->is_cancelled() || this->has_failed(); }); + } +} + void streamfx::util::threadpool::task::await_completion() { - if (!_is_dead) { - std::unique_lock lock(_mutex); - _is_complete.wait(lock, [this]() { return this->_is_dead.load(); }); + wait(); +} + +streamfx::util::threadpool::threadpool::~threadpool() +{ + { // Terminate all remaining tasks. + std::lock_guard lg(_tasks_lock); + for (auto task : _tasks) { + task->cancel(); + } + _tasks.clear(); + } + + { // Notify workers to stop working. + { + std::lock_guard lg(_workers_lock); + for (auto worker : _workers) { + worker->stop = true; + } + } + { + std::lock_guard lg(_tasks_lock); + _tasks_cv.notify_all(); + } + for (auto worker : _workers) { + std::lock_guard lg(worker->lifeline); + } + } +} + +streamfx::util::threadpool::threadpool::threadpool(size_t minimum, size_t maximum) + : _limits{minimum, maximum}, _workers_lock(), _workers(), _tasks_lock(), _tasks_cv(), _tasks() +{ + // Spawn the minimum number of threads. + spawn(_limits.first); +} + +std::shared_ptr + streamfx::util::threadpool::threadpool::push(task_callback_t callback, task_data_t data /*= nullptr*/) +{ + std::lock_guard lg(_tasks_lock); + constexpr size_t threshold = 3; + + // Enqueue the new task. + auto task = std::make_shared(callback, data); + _tasks.emplace_back(task); + + // Spawn additional workers if the number of queued tasks exceeds a threshold. + if (_tasks.size() > (threshold * _worker_count)) { + spawn(_tasks.size() / threshold); + } + + // Return handle to caller. + return task; +} + +void streamfx::util::threadpool::threadpool::pop(std::shared_ptr task) +{ + if (task) { + task->cancel(); + } + std::lock_guard lg(_tasks_lock); + _tasks.remove(task); +} + +void streamfx::util::threadpool::threadpool::spawn(size_t count) +{ + std::lock_guard lg(_workers_lock); + for (size_t n = 0; (n < count) && (_worker_count < _limits.second); n++) { + auto wi = std::make_shared(); + wi->stop = false; + wi->last_work_time = std::chrono::high_resolution_clock::now(); + wi->thread = std::thread(std::bind(&streamfx::util::threadpool::threadpool::work, this, wi)); + wi->thread.detach(); + _workers.emplace_back(wi); + ++_worker_count; + D_LOG_DEBUG("Spawning new worker thread (%zu < %zu < %zu).", _limits.first, _worker_count.load(), + _limits.second); + } +} + +bool streamfx::util::threadpool::threadpool::die(std::shared_ptr wi) +{ + constexpr std::chrono::seconds delay{1}; + + std::lock_guard lg(_workers_lock); + bool result = false; + + if (_worker_count > _limits.first) { + auto now = std::chrono::high_resolution_clock::now(); + result = ((wi->last_work_time + delay) <= now) && ((_last_worker_death + delay) <= now); + + if (result) { + _last_worker_death = now; + --_worker_count; + _workers.remove(wi); + D_LOG_DEBUG("Terminated idle worker thread (%zu < %zu < %zu).", _limits.first, _worker_count.load(), + _limits.second); + } + } + + return result; +} + +void streamfx::util::threadpool::threadpool::work(std::shared_ptr wi) +{ + std::shared_ptr task{}; + std::lock_guard lg(wi->lifeline); + +#if defined(D_PLATFORM_WINDOWS) + SetThreadPriority(GetCurrentThread(), THREAD_MODE_BACKGROUND_BEGIN | THREAD_PRIORITY_BELOW_NORMAL); + SetThreadDescription(GetCurrentThread(), L"StreamFX Worker Thread"); +#elif defined(D_PLATFORM_LINUX) + struct sched_param param; + param.sched_priority = 0; + pthread_setschedparam(pthread_self(), SCHED_IDLE, ¶m); + pthread_setname_np(pthread_self(), "StreamFX Worker Thread"); +#endif + + while (!wi->stop) { + { // Try and acquire new work. + std::unique_lock ul(_tasks_lock); + + // Is there any work available right now? + if (_tasks.size() == 0) { // If not: + // Block this thread until it is notified of a change. + _tasks_cv.wait_until( + ul, + std::chrono::time_point(std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(250)), + [this, wi]() { return wi->stop || _tasks.size() > 0; }); + } + + // If we were asked to stop, skip everything. + if (wi->stop) { + continue; + } + + // If there is work to be done, take it. + if (_tasks.size() > 0) { + wi->last_work_time = std::chrono::high_resolution_clock::now(); + task = _tasks.front(); + _tasks.pop_front(); + } else if (die(wi)) { // Is the threadpool requesting less threads? + break; + } + } + + if (task) { + task->run(); + task.reset(); + } } } diff --git a/source/util/util-threadpool.hpp b/source/util/util-threadpool.hpp index a003a170..89152ef4 100644 --- a/source/util/util-threadpool.hpp +++ b/source/util/util-threadpool.hpp @@ -1,75 +1,148 @@ -/* - * Modern effects for a modern Streamer - * Copyright (C) 2020 Michael Fabian Dirks - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA - */ +// Copyright (C) 2020-2022 Michael Fabian Dirks +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA #pragma once #include "warning-disable.hpp" #include +#include +#include #include +#include #include #include #include #include +#include #include #include #include "warning-enable.hpp" -namespace streamfx::util { - typedef std::shared_ptr threadpool_data_t; - typedef std::function threadpool_callback_t; +namespace streamfx::util::threadpool { + typedef std::shared_ptr task_data_t; + typedef std::function task_callback_t; + + struct worker_info { +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::atomic stop; + +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::mutex lifeline; + + std::chrono::high_resolution_clock::time_point last_work_time; + + std::thread thread; + }; + + class task { + task_callback_t _callback; + task_data_t _data; + std::mutex _lock; + +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::condition_variable _status_changed; +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::atomic _cancelled; +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::atomic _completed; +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::atomic _failed; + + public: + task(task_callback_t callback, task_data_t data); + + public: + ~task(); + + public: + void run(); + + public: + void cancel(); + + public: + bool is_cancelled(); + + public: + bool is_completed(); + + public: + bool has_failed(); + + public: + void wait(); + + public: + void await_completion(); + }; class threadpool { - public: - class task { - protected: - std::mutex _mutex; - std::condition_variable _is_complete; - std::atomic _is_dead; - threadpool_callback_t _callback; - threadpool_data_t _data; + std::pair _limits; - public: - task(); - task(threadpool_callback_t callback_function, threadpool_data_t data); +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::mutex _workers_lock; + std::list> _workers; +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::atomic _worker_count; + std::chrono::high_resolution_clock::time_point _last_worker_death; - void await_completion(); - - friend class streamfx::util::threadpool; - }; - - private: - std::list _workers; - std::atomic _worker_stop; - std::atomic _worker_idx; - std::list> _tasks; - std::mutex _tasks_lock; - std::condition_variable _tasks_cv; +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::mutex _tasks_lock; +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::condition_variable _tasks_cv; + std::list> _tasks; public: - threadpool(); ~threadpool(); - std::shared_ptr<::streamfx::util::threadpool::task> push(threadpool_callback_t callback_function, - threadpool_data_t data); + public: + threadpool(size_t minimum = 2, size_t maximum = std::thread::hardware_concurrency()); - void pop(std::shared_ptr<::streamfx::util::threadpool::task> work); + public: + std::shared_ptr push(task_callback_t callback, task_data_t data = nullptr); + + public: + void pop(std::shared_ptr task); private: - void work(); + void spawn(size_t count = 1); + + private: + bool die(std::shared_ptr); + + private: + void work(std::shared_ptr); }; -} // namespace streamfx::util +} // namespace streamfx::util::threadpool