util/threadpool: Formatting and allow cancelling of tasks

This commit is contained in:
Michael Fabian 'Xaymar' Dirks 2020-05-10 02:50:52 +02:00
parent 9dacdae66f
commit 0eddfd75ae
2 changed files with 74 additions and 19 deletions

View file

@ -25,7 +25,7 @@
util::threadpool::threadpool() : _workers(), _worker_stop(false), _tasks(), _tasks_lock(), _tasks_cv() util::threadpool::threadpool() : _workers(), _worker_stop(false), _tasks(), _tasks_lock(), _tasks_cv()
{ {
std::size_t concurrency = static_cast<size_t>(std::thread::hardware_concurrency()) * CONCURRENCY_MULTIPLIER; std::size_t concurrency = static_cast<size_t>(std::thread::hardware_concurrency() * CONCURRENCY_MULTIPLIER);
for (std::size_t n = 0; n < concurrency; n++) { for (std::size_t n = 0; n < concurrency; n++) {
_workers.emplace_back(std::bind(&util::threadpool::work, this)); _workers.emplace_back(std::bind(&util::threadpool::work, this));
} }
@ -37,36 +37,73 @@ util::threadpool::~threadpool()
_tasks_cv.notify_all(); _tasks_cv.notify_all();
for (auto& thread : _workers) { for (auto& thread : _workers) {
_tasks_cv.notify_all(); _tasks_cv.notify_all();
if (thread.joinable()) if (thread.joinable()) {
thread.join(); thread.join();
}
} }
} }
void util::threadpool::push(threadpool_function_t fn, std::shared_ptr<void> data) std::shared_ptr<::util::threadpool::task> util::threadpool::push(threadpool_callback_t fn, threadpool_data_t data)
{ {
auto task = std::make_shared<util::threadpool::task>(fn, data);
std::unique_lock<std::mutex> lock(_tasks_lock); std::unique_lock<std::mutex> lock(_tasks_lock);
_tasks.emplace_back(fn, data); _tasks.emplace_back(task);
_tasks_cv.notify_one(); _tasks_cv.notify_one();
return task;
}
void util::threadpool::pop(std::shared_ptr<::util::threadpool::task> work)
{
if (work) {
work->_is_dead.store(true);
}
} }
void util::threadpool::work() void util::threadpool::work()
{ {
while (!_worker_stop) { static thread_local std::shared_ptr<util::threadpool::task> local_work{};
std::pair<threadpool_function_t, std::shared_ptr<void>> work;
while (!_worker_stop) {
// Wait for more work, or immediately continue if there is still work to do. // Wait for more work, or immediately continue if there is still work to do.
{ {
// Lock the tasks mutex to check for work.
std::unique_lock<std::mutex> lock(_tasks_lock); std::unique_lock<std::mutex> lock(_tasks_lock);
if (_tasks.size() == 0)
// If there are currently no tasks queued, wait on the condition variable.
// This temporarily unlocks the mutex until it is woken up.
if (_tasks.size() == 0) {
_tasks_cv.wait(lock, [this]() { return _worker_stop || _tasks.size() > 0; }); _tasks_cv.wait(lock, [this]() { return _worker_stop || _tasks.size() > 0; });
if (_worker_stop || (_tasks.size() == 0)) }
// If there is either no tasks or we were asked to stop, skip everything.
if (_worker_stop || (_tasks.size() == 0)) {
continue; continue;
work = _tasks.front(); }
// Grab the latest task and immediately remove it from the queue.
local_work = _tasks.front();
_tasks.pop_front(); _tasks.pop_front();
} }
// Execute work. // If the task was killed, skip everything again.
if (work.first) if (local_work->_is_dead.load()) {
work.first(work.second); continue;
}
// Try to execute work, but don't crash on catchable exceptions.
if (local_work->_callback) {
try {
local_work->_callback(local_work->_data);
} catch (std::exception const& ex) {
} catch (...) { // This does not catch unrecoverable exceptions, or exceptions that went through C code.
}
}
} }
} }
util::threadpool::task::task() {}
util::threadpool::task::task(threadpool_callback_t fn, threadpool_data_t dt) : _callback(fn), _data(dt), _is_dead(false)
{}

View file

@ -28,20 +28,38 @@
#include <thread> #include <thread>
namespace util { namespace util {
typedef std::function<void(std::shared_ptr<void>)> threadpool_function_t; typedef std::shared_ptr<void> threadpool_data_t;
typedef std::function<void(threadpool_data_t)> threadpool_callback_t;
class threadpool { class threadpool {
std::list<std::thread> _workers; public:
std::atomic_bool _worker_stop; class task {
std::list<std::pair<threadpool_function_t, std::shared_ptr<void>>> _tasks; protected:
std::mutex _tasks_lock; std::atomic_bool _is_dead;
std::condition_variable _tasks_cv; threadpool_callback_t _callback;
threadpool_data_t _data;
public:
task();
task(threadpool_callback_t callback_function, threadpool_data_t data);
friend class util::threadpool;
};
private:
std::list<std::thread> _workers;
std::atomic_bool _worker_stop;
std::list<std::shared_ptr<::util::threadpool::task>> _tasks;
std::mutex _tasks_lock;
std::condition_variable _tasks_cv;
public: public:
threadpool(); threadpool();
~threadpool(); ~threadpool();
void push(threadpool_function_t fn, std::shared_ptr<void> data); std::shared_ptr<::util::threadpool::task> push(threadpool_callback_t callback_function, threadpool_data_t data);
void pop(std::shared_ptr<::util::threadpool::task> work);
private: private:
void work(); void work();