mirror of
https://github.com/Xaymar/obs-StreamFX
synced 2024-12-29 11:01:23 +00:00
263 lines
7.7 KiB
C++
263 lines
7.7 KiB
C++
// AUTOGENERATED COPYRIGHT HEADER START
|
|
// Copyright (C) 2020-2023 Michael Fabian 'Xaymar' Dirks <info@xaymar.com>
|
|
// Copyright (C) 2022 lainon <GermanAizek@yandex.ru>
|
|
// Copyright (C) 2023 tt2468 <tt2468@gmail.com>
|
|
// AUTOGENERATED COPYRIGHT HEADER END
|
|
|
|
#include "util-threadpool.hpp"
|
|
#include "common.hpp"
|
|
#include "plugin.hpp"
|
|
#include "util/util-logging.hpp"
|
|
|
|
#include "warning-disable.hpp"
|
|
#include <cstddef>
|
|
#include "warning-enable.hpp"
|
|
|
|
#include "warning-disable.hpp"
|
|
#if defined(D_PLATFORM_WINDOWS)
|
|
#include <Windows.h>
|
|
#elif defined(D_PLATFORM_LINUX)
|
|
#include <pthread.h>
|
|
#endif
|
|
#include "warning-enable.hpp"
|
|
|
|
#ifdef _DEBUG
|
|
#define ST_PREFIX "<%s> "
|
|
#define D_LOG_ERROR(x, ...) P_LOG_ERROR(ST_PREFIX##x, __FUNCTION_SIG__, __VA_ARGS__)
|
|
#define D_LOG_WARNING(x, ...) P_LOG_WARN(ST_PREFIX##x, __FUNCTION_SIG__, __VA_ARGS__)
|
|
#define D_LOG_INFO(x, ...) P_LOG_INFO(ST_PREFIX##x, __FUNCTION_SIG__, __VA_ARGS__)
|
|
#define D_LOG_DEBUG(x, ...) P_LOG_DEBUG(ST_PREFIX##x, __FUNCTION_SIG__, __VA_ARGS__)
|
|
#else
|
|
#define ST_PREFIX "<util::threadpool> "
|
|
#define D_LOG_ERROR(...) P_LOG_ERROR(ST_PREFIX __VA_ARGS__)
|
|
#define D_LOG_WARNING(...) P_LOG_WARN(ST_PREFIX __VA_ARGS__)
|
|
#define D_LOG_INFO(...) P_LOG_INFO(ST_PREFIX __VA_ARGS__)
|
|
#define D_LOG_DEBUG(...) P_LOG_DEBUG(ST_PREFIX __VA_ARGS__)
|
|
#endif
|
|
|
|
streamfx::util::threadpool::task::task(task_callback_t callback, task_data_t data) : _callback(callback), _data(data), _lock(), _status_changed(), _cancelled(false), _completed(false), _failed(false) {}
|
|
|
|
streamfx::util::threadpool::task::~task() {}
|
|
|
|
void streamfx::util::threadpool::task::run()
|
|
{
|
|
std::lock_guard<std::mutex> lg(_lock);
|
|
if (!_cancelled) {
|
|
try {
|
|
_callback(_data);
|
|
} catch (const std::exception& ex) {
|
|
D_LOG_ERROR("Unhandled exception in Task: %s.", ex.what());
|
|
_failed = false;
|
|
} catch (...) {
|
|
D_LOG_ERROR("Unhandled exception in Task.", nullptr);
|
|
_failed = true;
|
|
}
|
|
}
|
|
_completed = true;
|
|
_status_changed.notify_all();
|
|
}
|
|
|
|
void streamfx::util::threadpool::task::cancel()
|
|
{
|
|
std::lock_guard<std::mutex> lg(_lock);
|
|
_cancelled = true;
|
|
_completed = true;
|
|
_status_changed.notify_all();
|
|
}
|
|
|
|
bool streamfx::util::threadpool::task::is_cancelled()
|
|
{
|
|
return _cancelled;
|
|
}
|
|
|
|
bool streamfx::util::threadpool::task::is_completed()
|
|
{
|
|
return _completed;
|
|
}
|
|
|
|
bool streamfx::util::threadpool::task::has_failed()
|
|
{
|
|
return _failed;
|
|
}
|
|
|
|
void streamfx::util::threadpool::task::wait()
|
|
{
|
|
std::unique_lock<std::mutex> ul(_lock);
|
|
if (!_cancelled && !_completed && !_failed) {
|
|
_status_changed.wait(ul, [this]() { return this->is_completed() || this->is_cancelled() || this->has_failed(); });
|
|
}
|
|
}
|
|
|
|
void streamfx::util::threadpool::task::await_completion()
|
|
{
|
|
wait();
|
|
}
|
|
|
|
streamfx::util::threadpool::threadpool::~threadpool()
|
|
{
|
|
{ // Terminate all remaining tasks.
|
|
std::lock_guard<std::mutex> lg(_tasks_lock);
|
|
for (auto task : _tasks) {
|
|
task->cancel();
|
|
}
|
|
_tasks.clear();
|
|
}
|
|
|
|
{ // Notify workers to stop working.
|
|
{
|
|
std::lock_guard<std::mutex> lg(_workers_lock);
|
|
for (auto worker : _workers) {
|
|
worker->stop = true;
|
|
}
|
|
}
|
|
{
|
|
std::lock_guard<std::mutex> lg(_tasks_lock);
|
|
_tasks_cv.notify_all();
|
|
}
|
|
for (auto worker : _workers) {
|
|
std::lock_guard<std::mutex> lg(worker->lifeline);
|
|
}
|
|
}
|
|
}
|
|
|
|
streamfx::util::threadpool::threadpool::threadpool(size_t minimum, size_t maximum) : _limits{minimum, maximum}, _workers_lock(), _worker_count(0), _workers(), _tasks_lock(), _tasks_cv(), _tasks()
|
|
{
|
|
// Spawn the minimum number of threads.
|
|
spawn(_limits.first);
|
|
}
|
|
|
|
std::shared_ptr<streamfx::util::threadpool::task> streamfx::util::threadpool::threadpool::push(task_callback_t callback, task_data_t data /*= nullptr*/)
|
|
{
|
|
std::lock_guard<std::mutex> lg(_tasks_lock);
|
|
constexpr size_t threshold = 3;
|
|
|
|
// Enqueue the new task.
|
|
auto task = std::make_shared<streamfx::util::threadpool::task>(callback, data);
|
|
_tasks.emplace_back(task);
|
|
|
|
// Spawn additional workers if the number of queued tasks exceeds a threshold.
|
|
if (_tasks.size() > (threshold * _worker_count)) {
|
|
spawn(_tasks.size() / threshold);
|
|
}
|
|
|
|
// Return handle to caller.
|
|
return task;
|
|
}
|
|
|
|
void streamfx::util::threadpool::threadpool::pop(std::shared_ptr<task> task)
|
|
{
|
|
if (task) {
|
|
task->cancel();
|
|
}
|
|
std::lock_guard<std::mutex> lg(_tasks_lock);
|
|
_tasks.remove(task);
|
|
}
|
|
|
|
void streamfx::util::threadpool::threadpool::spawn(size_t count)
|
|
{
|
|
std::lock_guard<std::mutex> lg(_workers_lock);
|
|
for (size_t n = 0; (n < count) && (_worker_count < _limits.second); n++) {
|
|
auto wi = std::make_shared<worker_info>();
|
|
wi->stop = false;
|
|
wi->last_work_time = std::chrono::high_resolution_clock::now();
|
|
wi->thread = std::thread(std::bind(&streamfx::util::threadpool::threadpool::work, this, wi));
|
|
wi->thread.detach();
|
|
_workers.emplace_back(wi);
|
|
++_worker_count;
|
|
D_LOG_DEBUG("Spawning new worker thread (%zu < %zu < %zu).", _limits.first, _worker_count.load(), _limits.second);
|
|
}
|
|
}
|
|
|
|
bool streamfx::util::threadpool::threadpool::die(std::shared_ptr<worker_info> wi)
|
|
{
|
|
constexpr std::chrono::seconds delay{1};
|
|
|
|
std::lock_guard<std::mutex> lg(_workers_lock);
|
|
bool result = false;
|
|
|
|
if (_worker_count > _limits.first) {
|
|
auto now = std::chrono::high_resolution_clock::now();
|
|
result = ((wi->last_work_time + delay) <= now) && ((_last_worker_death + delay) <= now);
|
|
|
|
if (result) {
|
|
_last_worker_death = now;
|
|
--_worker_count;
|
|
_workers.remove(wi);
|
|
D_LOG_DEBUG("Terminated idle worker thread (%zu < %zu < %zu).", _limits.first, _worker_count.load(), _limits.second);
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
void streamfx::util::threadpool::threadpool::work(std::shared_ptr<worker_info> wi)
|
|
{
|
|
std::shared_ptr<streamfx::util::threadpool::task> task{};
|
|
std::lock_guard<std::mutex> lg(wi->lifeline);
|
|
|
|
#if defined(D_PLATFORM_WINDOWS)
|
|
SetThreadPriority(GetCurrentThread(), THREAD_MODE_BACKGROUND_BEGIN | THREAD_PRIORITY_BELOW_NORMAL);
|
|
SetThreadDescription(GetCurrentThread(), L"StreamFX Worker Thread");
|
|
#elif defined(D_PLATFORM_LINUX)
|
|
struct sched_param param;
|
|
param.sched_priority = 0;
|
|
pthread_setschedparam(pthread_self(), SCHED_IDLE, ¶m);
|
|
pthread_setname_np(pthread_self(), "StreamFX Worker Thread");
|
|
#endif
|
|
|
|
while (!wi->stop) {
|
|
{ // Try and acquire new work.
|
|
std::unique_lock<std::mutex> ul(_tasks_lock);
|
|
|
|
// Is there any work available right now?
|
|
if (_tasks.size() == 0) { // If not:
|
|
// Block this thread until it is notified of a change.
|
|
_tasks_cv.wait_until(ul, std::chrono::time_point(std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(250)), [this, wi]() { return wi->stop || _tasks.size() > 0; });
|
|
}
|
|
|
|
// If we were asked to stop, skip everything.
|
|
if (wi->stop) {
|
|
continue;
|
|
}
|
|
|
|
// If there is work to be done, take it.
|
|
if (_tasks.size() > 0) {
|
|
wi->last_work_time = std::chrono::high_resolution_clock::now();
|
|
task = _tasks.front();
|
|
_tasks.pop_front();
|
|
} else if (die(wi)) { // Is the threadpool requesting less threads?
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (task) {
|
|
task->run();
|
|
task.reset();
|
|
}
|
|
}
|
|
}
|
|
|
|
std::shared_ptr<streamfx::util::threadpool::threadpool> streamfx::util::threadpool::threadpool::instance()
|
|
{
|
|
static std::weak_ptr<streamfx::util::threadpool::threadpool> winst;
|
|
static std::mutex mtx;
|
|
|
|
std::unique_lock<decltype(mtx)> lock(mtx);
|
|
auto instance = winst.lock();
|
|
if (!instance) {
|
|
instance = std::shared_ptr<streamfx::util::threadpool::threadpool>(new streamfx::util::threadpool::threadpool());
|
|
winst = instance;
|
|
}
|
|
return instance;
|
|
};
|
|
|
|
static std::shared_ptr<streamfx::util::threadpool::threadpool> loader_instance;
|
|
|
|
static auto loader = streamfx::loader(
|
|
[]() { // Initalizer
|
|
loader_instance = streamfx::util::threadpool::threadpool::instance();
|
|
},
|
|
[]() { // Finalizer
|
|
loader_instance.reset();
|
|
},
|
|
streamfx::loader_priority::HIGHEST);
|