#ifndef THREADPOOL_H #define THREADPOOL_H #include #include #include #include #include #include class ThreadPool { long numOfThreads; std::deque> _tasks; std::mutex _queueLock; std::atomic stopped{false}; bool startedAlready = false; std::condition_variable _queueCond; std::condition_variable _threadFinishedCond; long numOfThreadsRunning = 0; std::vector _threads; protected: inline void threadWorker(); inline void joinAll(); public: inline ThreadPool(); inline ~ThreadPool(); inline void push(const std::function &task); inline void start(const long NumOfThreads = std::thread::hardware_concurrency()); inline void finish(); }; void ThreadPool::threadWorker() { while (true) { std::unique_lock lg(_queueLock); while (_tasks.empty() && !stopped.load()) { _queueCond.wait(lg); } if (!_tasks.empty()) { auto theTask = _tasks[0]; _tasks.pop_front(); lg.unlock(); theTask(); lg.lock(); } if (_tasks.empty() && stopped.load()) { numOfThreadsRunning--; _threadFinishedCond.notify_one(); break; } } } void ThreadPool::joinAll() { for (long i = 0; i < numOfThreads; i++) { if (_threads[i].joinable()) _threads[i].join(); } } ThreadPool::ThreadPool() {} ThreadPool::~ThreadPool() { this->finish(); } void ThreadPool::push(const std::function &task) { std::unique_lock lg(_queueLock); _tasks.push_back(task); _queueCond.notify_one(); } void ThreadPool::start(const long NumOfThreads) { if (!startedAlready) startedAlready = true; else throw std::logic_error("You cannot start the thread pool multiple times."); numOfThreads = NumOfThreads; _threads.reserve(numOfThreads); for (long i = 0; i < numOfThreads; i++) { _threads.push_back(std::thread(std::bind(&ThreadPool::threadWorker, this))); numOfThreadsRunning++; } } void ThreadPool::finish() { std::unique_lock lg(_queueLock); stopped.store(true); _queueCond.notify_all(); while (numOfThreadsRunning > 0) { _threadFinishedCond.wait(lg); } lg.unlock(); joinAll(); } #endif // THREADPOOL_H