util/threadpool: Allow waiting for task completion

This commit is contained in:
Michael Fabian 'Xaymar' Dirks 2021-10-25 21:56:49 +02:00
parent 0e5d0fe4e7
commit 97318eacac
2 changed files with 29 additions and 7 deletions

View file

@ -65,6 +65,7 @@ std::shared_ptr<::streamfx::util::threadpool::task> streamfx::util::threadpool::
{
auto task = std::make_shared<streamfx::util::threadpool::task>(fn, data);
// Append the task to the queue.
std::unique_lock<std::mutex> lock(_tasks_lock);
_tasks.emplace_back(task);
_tasks_cv.notify_one();
@ -75,7 +76,11 @@ std::shared_ptr<::streamfx::util::threadpool::task> streamfx::util::threadpool::
void streamfx::util::threadpool::pop(std::shared_ptr<::streamfx::util::threadpool::task> work)
{
if (work) {
work->_is_dead.store(true);
{
std::unique_lock<std::mutex> lock(work->_mutex);
work->_is_dead = true;
}
work->_is_complete.notify_all();
}
}
@ -107,7 +112,7 @@ void streamfx::util::threadpool::work()
}
// If the task was killed, skip everything again.
if (local_work->_is_dead) {
if (local_work->_is_dead.load()) {
continue;
}
@ -126,6 +131,11 @@ void streamfx::util::threadpool::work()
local_number, reinterpret_cast<ptrdiff_t>(local_work->_callback.target<void>()),
reinterpret_cast<ptrdiff_t>(local_work->_data.get()));
}
{
std::unique_lock<std::mutex> lock(local_work->_mutex);
local_work->_is_dead.store(true);
}
local_work->_is_complete.notify_all();
}
// Remove our reference to the work unit.
@ -138,5 +148,13 @@ void streamfx::util::threadpool::work()
streamfx::util::threadpool::task::task() {}
streamfx::util::threadpool::task::task(threadpool_callback_t fn, threadpool_data_t dt)
: _is_dead(false), _callback(fn), _data(dt)
: _mutex(), _is_complete(), _is_dead(false), _callback(fn), _data(dt)
{}
void streamfx::util::threadpool::task::await_completion()
{
if (!_is_dead) {
std::unique_lock<std::mutex> lock(_mutex);
_is_complete.wait(lock, [this]() { return this->_is_dead.load(); });
}
}

View file

@ -35,20 +35,24 @@ namespace streamfx::util {
public:
class task {
protected:
std::atomic_bool _is_dead;
threadpool_callback_t _callback;
threadpool_data_t _data;
std::mutex _mutex;
std::condition_variable _is_complete;
std::atomic<bool> _is_dead;
threadpool_callback_t _callback;
threadpool_data_t _data;
public:
task();
task(threadpool_callback_t callback_function, threadpool_data_t data);
void await_completion();
friend class streamfx::util::threadpool;
};
private:
std::list<std::thread> _workers;
std::atomic_bool _worker_stop;
std::atomic<bool> _worker_stop;
std::atomic<uint32_t> _worker_idx;
std::list<std::shared_ptr<::streamfx::util::threadpool::task>> _tasks;
std::mutex _tasks_lock;