Removed atomicity (not necessary) and renamed some variables.

This commit is contained in:
Samer Afach 2017-06-06 19:31:23 +02:00
parent 0e45acf8ce
commit 375d5a38b1

View File

@ -1,7 +1,6 @@
#ifndef THREADPOOL_H #ifndef THREADPOOL_H
#define THREADPOOL_H #define THREADPOOL_H
#include <atomic>
#include <deque> #include <deque>
#include <functional> #include <functional>
#include <mutex> #include <mutex>
@ -12,16 +11,16 @@ class ThreadPool {
long numOfThreads; long numOfThreads;
std::deque<std::function<void()>> _tasks; std::deque<std::function<void()>> _tasks;
std::mutex _queueLock; std::mutex _queueLock;
std::atomic<bool> stopped{false}; bool conclude_work = false;
bool startedAlready = false; bool started_already = false;
std::condition_variable _queueCond; std::condition_variable _queueCond;
std::condition_variable _threadFinishedCond; std::condition_variable _threadFinishedCond;
long numOfThreadsRunning = 0; long num_of_threads_running = 0;
std::vector<std::thread> _threads; std::vector<std::thread> _threads;
protected: protected:
inline void threadWorker(); inline void thread_worker();
inline void joinAll(); inline void join_all();
public: public:
inline ThreadPool(); inline ThreadPool();
@ -32,10 +31,10 @@ public:
inline void finish(); inline void finish();
}; };
void ThreadPool::threadWorker() { void ThreadPool::thread_worker() {
while (true) { while (true) {
std::unique_lock<decltype(_queueLock)> lg(_queueLock); std::unique_lock<decltype(_queueLock)> lg(_queueLock);
while (_tasks.empty() && !stopped.load()) { while (_tasks.empty() && !conclude_work) {
_queueCond.wait(lg); _queueCond.wait(lg);
} }
if (!_tasks.empty()) { if (!_tasks.empty()) {
@ -45,15 +44,15 @@ void ThreadPool::threadWorker() {
theTask(); theTask();
lg.lock(); lg.lock();
} }
if (_tasks.empty() && stopped.load()) { if (_tasks.empty() && conclude_work) {
numOfThreadsRunning--; num_of_threads_running--;
_threadFinishedCond.notify_one(); _threadFinishedCond.notify_one();
break; break;
} }
} }
} }
void ThreadPool::joinAll() { void ThreadPool::join_all() {
for (long i = 0; i < numOfThreads; i++) { for (long i = 0; i < numOfThreads; i++) {
if (_threads[i].joinable()) if (_threads[i].joinable())
_threads[i].join(); _threads[i].join();
@ -71,27 +70,28 @@ void ThreadPool::push(const std::function<void()> &task) {
} }
void ThreadPool::start(const long NumOfThreads) { void ThreadPool::start(const long NumOfThreads) {
if (!startedAlready) if (!started_already)
startedAlready = true; started_already = true;
else else
throw std::logic_error("You cannot start the thread pool multiple times."); throw std::logic_error("You cannot start the thread pool multiple times.");
numOfThreads = NumOfThreads; numOfThreads = NumOfThreads;
_threads.reserve(numOfThreads); _threads.reserve(numOfThreads);
for (long i = 0; i < numOfThreads; i++) { for (long i = 0; i < numOfThreads; i++) {
_threads.push_back(std::thread(std::bind(&ThreadPool::threadWorker, this))); _threads.push_back(
numOfThreadsRunning++; std::thread(std::bind(&ThreadPool::thread_worker, this)));
num_of_threads_running++;
} }
} }
void ThreadPool::finish() { void ThreadPool::finish() {
std::unique_lock<decltype(_queueLock)> lg(_queueLock); std::unique_lock<decltype(_queueLock)> lg(_queueLock);
stopped.store(true); conclude_work = true;
_queueCond.notify_all(); _queueCond.notify_all();
while (numOfThreadsRunning > 0) { while (num_of_threads_running > 0) {
_threadFinishedCond.wait(lg); _threadFinishedCond.wait(lg);
} }
lg.unlock(); lg.unlock();
joinAll(); join_all();
} }
#endif // THREADPOOL_H #endif // THREADPOOL_H