diff --git a/source/util-threadpool.cpp b/source/util-threadpool.cpp index 8d710afb..a3f4ae12 100644 --- a/source/util-threadpool.cpp +++ b/source/util-threadpool.cpp @@ -25,7 +25,7 @@ util::threadpool::threadpool() : _workers(), _worker_stop(false), _tasks(), _tasks_lock(), _tasks_cv() { - std::size_t concurrency = static_cast(std::thread::hardware_concurrency()) * CONCURRENCY_MULTIPLIER; + std::size_t concurrency = static_cast(std::thread::hardware_concurrency() * CONCURRENCY_MULTIPLIER); for (std::size_t n = 0; n < concurrency; n++) { _workers.emplace_back(std::bind(&util::threadpool::work, this)); } @@ -37,36 +37,73 @@ util::threadpool::~threadpool() _tasks_cv.notify_all(); for (auto& thread : _workers) { _tasks_cv.notify_all(); - if (thread.joinable()) + if (thread.joinable()) { thread.join(); + } } } -void util::threadpool::push(threadpool_function_t fn, std::shared_ptr data) +std::shared_ptr<::util::threadpool::task> util::threadpool::push(threadpool_callback_t fn, threadpool_data_t data) { + auto task = std::make_shared(fn, data); + std::unique_lock lock(_tasks_lock); - _tasks.emplace_back(fn, data); + _tasks.emplace_back(task); _tasks_cv.notify_one(); + + return task; +} + +void util::threadpool::pop(std::shared_ptr<::util::threadpool::task> work) +{ + if (work) { + work->_is_dead.store(true); + } } void util::threadpool::work() { - while (!_worker_stop) { - std::pair> work; + static thread_local std::shared_ptr local_work{}; + while (!_worker_stop) { // Wait for more work, or immediately continue if there is still work to do. { + // Lock the tasks mutex to check for work. std::unique_lock lock(_tasks_lock); - if (_tasks.size() == 0) + + // If there are currently no tasks queued, wait on the condition variable. + // This temporarily unlocks the mutex until it is woken up. + if (_tasks.size() == 0) { _tasks_cv.wait(lock, [this]() { return _worker_stop || _tasks.size() > 0; }); - if (_worker_stop || (_tasks.size() == 0)) + } + + // If there is either no tasks or we were asked to stop, skip everything. + if (_worker_stop || (_tasks.size() == 0)) { continue; - work = _tasks.front(); + } + + // Grab the latest task and immediately remove it from the queue. + local_work = _tasks.front(); _tasks.pop_front(); } - // Execute work. - if (work.first) - work.first(work.second); + // If the task was killed, skip everything again. + if (local_work->_is_dead.load()) { + continue; + } + + // Try to execute work, but don't crash on catchable exceptions. + if (local_work->_callback) { + try { + local_work->_callback(local_work->_data); + } catch (std::exception const& ex) { + } catch (...) { // This does not catch unrecoverable exceptions, or exceptions that went through C code. + } + } } } + +util::threadpool::task::task() {} + +util::threadpool::task::task(threadpool_callback_t fn, threadpool_data_t dt) : _callback(fn), _data(dt), _is_dead(false) +{} diff --git a/source/util-threadpool.hpp b/source/util-threadpool.hpp index 73562225..428d9c12 100644 --- a/source/util-threadpool.hpp +++ b/source/util-threadpool.hpp @@ -28,20 +28,38 @@ #include namespace util { - typedef std::function)> threadpool_function_t; + typedef std::shared_ptr threadpool_data_t; + typedef std::function threadpool_callback_t; class threadpool { - std::list _workers; - std::atomic_bool _worker_stop; - std::list>> _tasks; - std::mutex _tasks_lock; - std::condition_variable _tasks_cv; + public: + class task { + protected: + std::atomic_bool _is_dead; + threadpool_callback_t _callback; + threadpool_data_t _data; + + public: + task(); + task(threadpool_callback_t callback_function, threadpool_data_t data); + + friend class util::threadpool; + }; + + private: + std::list _workers; + std::atomic_bool _worker_stop; + std::list> _tasks; + std::mutex _tasks_lock; + std::condition_variable _tasks_cv; public: threadpool(); ~threadpool(); - void push(threadpool_function_t fn, std::shared_ptr data); + std::shared_ptr<::util::threadpool::task> push(threadpool_callback_t callback_function, threadpool_data_t data); + + void pop(std::shared_ptr<::util::threadpool::task> work); private: void work();