StdThreadPool/include/ThreadPool.h

99 lines
2.3 KiB
C++

#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <atomic>
#include <deque>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
class ThreadPool {
int numOfThreads;
std::deque<std::function<void()>> _tasks;
std::mutex _queueLock;
std::atomic<bool> stopped{false};
bool startedAlready = false;
std::condition_variable _queueCond;
std::condition_variable _threadFinishedCond;
std::atomic_int numOfThreadsRunning{0};
std::vector<std::thread> _threads;
protected:
inline void threadWorker();
inline void joinAll();
public:
inline ThreadPool();
inline ~ThreadPool();
inline void push(const std::function<void()> &task);
inline void
start(const int NumOfThreads = std::thread::hardware_concurrency());
inline void finish();
};
void ThreadPool::threadWorker() {
while (true) {
std::unique_lock<decltype(_queueLock)> lg(_queueLock);
while (_tasks.empty() && !stopped.load()) {
_queueCond.wait(lg);
}
if (!_tasks.empty()) {
auto theTask = _tasks[0];
_tasks.pop_front();
lg.unlock();
theTask();
lg.lock();
}
if (_tasks.empty() && stopped.load()) {
numOfThreadsRunning--;
_threadFinishedCond.notify_one();
break;
}
}
}
void ThreadPool::joinAll() {
for (long i = 0; i < numOfThreads; i++) {
if (_threads[i].joinable())
_threads[i].join();
}
}
ThreadPool::ThreadPool() {}
ThreadPool::~ThreadPool() { this->finish(); }
void ThreadPool::push(const std::function<void()> &task) {
std::unique_lock<decltype(_queueLock)> lg(_queueLock);
_tasks.push_back(task);
lg.unlock();
_queueCond.notify_one();
}
void ThreadPool::start(const int NumOfThreads) {
if (!startedAlready)
startedAlready = true;
else
throw std::logic_error("You cannot start the thread pool multiple times.");
numOfThreads = NumOfThreads;
_threads.reserve(numOfThreads);
for (long i = 0; i < numOfThreads; i++) {
_threads.push_back(std::thread(std::bind(&ThreadPool::threadWorker, this)));
numOfThreadsRunning++;
}
}
void ThreadPool::finish() {
std::unique_lock<decltype(_queueLock)> lg(_queueLock);
stopped.store(true);
_queueCond.notify_all();
while (numOfThreadsRunning > 0) {
_threadFinishedCond.wait(lg);
}
lg.unlock();
joinAll();
}
#endif // THREADPOOL_H