diff --git a/.gitignore b/.gitignore index fab7372..1358d83 100644 --- a/.gitignore +++ b/.gitignore @@ -1,73 +1,73 @@ -# This file is used to ignore files which are generated -# ---------------------------------------------------------------------------- - -*~ -*.autosave -*.a -*.core -*.moc -*.o -*.obj -*.orig -*.rej -*.so -*.so.* -*_pch.h.cpp -*_resource.rc -*.qm -.#* -*.*# -core -!core/ -tags -.DS_Store -.directory -*.debug -Makefile* -*.prl -*.app -moc_*.cpp -ui_*.h -qrc_*.cpp -Thumbs.db -*.res -*.rc -/.qmake.cache -/.qmake.stash - -# qtcreator generated files -*.pro.user* - -# xemacs temporary files -*.flc - -# Vim temporary files -.*.swp - -# Visual Studio generated files -*.ib_pdb_index -*.idb -*.ilk -*.pdb -*.sln -*.suo -*.vcproj -*vcproj.*.*.user -*.ncb -*.sdf -*.opensdf -*.vcxproj -*vcxproj.* - -# MinGW generated files -*.Debug -*.Release - -# Python byte code -*.pyc - -# Binaries -# -------- -*.dll -*.exe - +# This file is used to ignore files which are generated +# ---------------------------------------------------------------------------- + +*~ +*.autosave +*.a +*.core +*.moc +*.o +*.obj +*.orig +*.rej +*.so +*.so.* +*_pch.h.cpp +*_resource.rc +*.qm +.#* +*.*# +core +!core/ +tags +.DS_Store +.directory +*.debug +Makefile* +*.prl +*.app +moc_*.cpp +ui_*.h +qrc_*.cpp +Thumbs.db +*.res +*.rc +/.qmake.cache +/.qmake.stash + +# qtcreator generated files +*.pro.user* + +# xemacs temporary files +*.flc + +# Vim temporary files +.*.swp + +# Visual Studio generated files +*.ib_pdb_index +*.idb +*.ilk +*.pdb +*.sln +*.suo +*.vcproj +*vcproj.*.*.user +*.ncb +*.sdf +*.opensdf +*.vcxproj +*vcxproj.* + +# MinGW generated files +*.Debug +*.Release + +# Python byte code +*.pyc + +# Binaries +# -------- +*.dll +*.exe + diff --git a/include/ThreadPool.h b/include/ThreadPool.h index 4b4ba17..ad87ecb 100644 --- a/include/ThreadPool.h +++ b/include/ThreadPool.h @@ -14,105 +14,110 @@ it. Using this library is your own responsibility #ifndef THREADPOOL_H #define THREADPOOL_H +#include #include #include #include #include #include -#include -class ThreadPool { - long numOfThreads; - std::deque> _tasks; - std::mutex _queueLock; - bool conclude_work = false; - bool started_already = false; - std::condition_variable _queueCond; - std::condition_variable _threadFinishedCond; - long num_of_threads_running = 0; - std::vector _threads; +class ThreadPool +{ + long numOfThreads; + std::deque> _tasks; + std::mutex _queueLock; + bool conclude_work = false; + bool started_already = false; + std::condition_variable _queueCond; + std::condition_variable _threadFinishedCond; + long num_of_threads_running = 0; + std::vector _threads; protected: - inline void thread_worker(); - inline void join_all(); + inline void thread_worker(); + inline void join_all(); public: - inline ThreadPool(); - inline ~ThreadPool(); - inline void push(const std::function &task); - inline void push(std::function &&task); - inline void - start(const long NumOfThreads = std::thread::hardware_concurrency()); - inline void finish(); + inline ThreadPool(); + inline ~ThreadPool(); + inline void push(const std::function& task); + inline void push(std::function&& task); + inline void start(const long NumOfThreads = std::thread::hardware_concurrency()); + inline void finish(); }; -void ThreadPool::thread_worker() { - while (true) { - std::unique_lock lg(_queueLock); - while (_tasks.empty() && !conclude_work) { - _queueCond.wait(lg); +void ThreadPool::thread_worker() +{ + while (true) { + std::unique_lock lg(_queueLock); + while (_tasks.empty() && !conclude_work) { + _queueCond.wait(lg); + } + if (!_tasks.empty()) { + auto theTask = _tasks[0]; + _tasks.pop_front(); + lg.unlock(); + theTask(); + lg.lock(); + } + if (_tasks.empty() && conclude_work) { + num_of_threads_running--; + _threadFinishedCond.notify_one(); + break; + } } - if (!_tasks.empty()) { - auto theTask = _tasks[0]; - _tasks.pop_front(); - lg.unlock(); - theTask(); - lg.lock(); - } - if (_tasks.empty() && conclude_work) { - num_of_threads_running--; - _threadFinishedCond.notify_one(); - break; - } - } } -void ThreadPool::join_all() { - for (long i = 0; i < static_cast(_threads.size()); i++) { - if (_threads[i].joinable()) - _threads[i].join(); - } +void ThreadPool::join_all() +{ + for (long i = 0; i < static_cast(_threads.size()); i++) { + if (_threads[i].joinable()) + _threads[i].join(); + } } ThreadPool::ThreadPool() {} ThreadPool::~ThreadPool() { this->finish(); } -void ThreadPool::push(const std::function &task) { - std::unique_lock lg(_queueLock); - _tasks.push_back(task); - _queueCond.notify_one(); +void ThreadPool::push(const std::function& task) +{ + std::unique_lock lg(_queueLock); + _tasks.push_back(task); + _queueCond.notify_one(); } -void ThreadPool::push(std::function&& task) { +void ThreadPool::push(std::function&& task) +{ std::unique_lock lg(_queueLock); _tasks.push_back(std::move(task)); _queueCond.notify_one(); } -void ThreadPool::start(const long NumOfThreads) { - if (!started_already) - started_already = true; - else - throw std::logic_error("You cannot start the thread pool multiple times."); - numOfThreads = NumOfThreads; - _threads.reserve(numOfThreads); - for (long i = 0; i < numOfThreads; i++) { - _threads.push_back( - std::thread(std::bind(&ThreadPool::thread_worker, this))); - num_of_threads_running++; - } +void ThreadPool::start(const long NumOfThreads) +{ + if (!started_already) + started_already = true; + else + throw std::logic_error("You cannot start the thread pool multiple times."); + numOfThreads = NumOfThreads; + _threads.reserve(numOfThreads); + for (long i = 0; i < numOfThreads; i++) { + _threads.push_back(std::thread(std::bind(&ThreadPool::thread_worker, this))); + num_of_threads_running++; + } } -void ThreadPool::finish() { - std::unique_lock lg(_queueLock); - conclude_work = true; - _queueCond.notify_all(); - while (num_of_threads_running > 0) { - _threadFinishedCond.wait(lg); - } - lg.unlock(); - join_all(); +void ThreadPool::finish() +{ + std::unique_lock lg(_queueLock); + conclude_work = true; + _queueCond.notify_all(); + while (num_of_threads_running > 0) { + _threadFinishedCond.wait(lg); + } + lg.unlock(); + join_all(); } #endif // THREADPOOL_H diff --git a/main.cpp b/main.cpp index 7c4d5df..87914cf 100644 --- a/main.cpp +++ b/main.cpp @@ -1,55 +1,61 @@ +#include #include #include "include/ThreadPool.h" -void SumZeroToNumber(long &num) { - long val = 0; - for (long i = 0; i <= static_cast(num); i++) { - val += i; - } - num = val; +void SumZeroToNumber(long& num) +{ + long val = 0; + for (long i = 0; i <= static_cast(num); i++) { + val += i; + } + num = val; } -int main() { - for (int tries = 0; tries < 10; tries++) { - std::cout << "Try number: " << tries + 1 << "... "; +int main() +{ + for (int tries = 0; tries < 10; tries++) { + std::cout << "Try number: " << tries + 1 << "... "; - std::vector nums(100000); - std::vector numsResults(nums.size()); - std::vector numsSequentialResults(nums.size()); - try { - ThreadPool pool; // num of cores - pool.start(); - for (unsigned i = 0; i < nums.size(); i++) { - nums[i] = rand() % 1000; - numsResults[i] = nums[i]; - numsSequentialResults[i] = nums[i]; - // this is just a copy to be passed and - // modified, while the original remains - // unchanged - pool.push(std::bind(SumZeroToNumber, std::ref(numsResults[i]))); - } - // std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - pool.finish(); - } catch (std::exception &ex) { - std::cout - << "An exception was thrown while processing data. Exception says: " - << ex.what() << std::endl; - std::exit(1); - } - // test results sequentially - for (unsigned i = 0; i < nums.size(); i++) { - SumZeroToNumber(numsSequentialResults[i]); - if (numsSequentialResults[i] != numsResults[i]) { - std::cout << "failed." << std::endl; - std::cout << "Comparing " << numsSequentialResults[i] << " with " - << numsResults[i] << " failed." << std::endl; - throw std::runtime_error("Results didn't match!"); - } - } - std::cout << "succeeded." << std::endl; - } + std::vector nums(100000); + std::vector numsResults(nums.size()); + std::vector numsSequentialResults(nums.size()); + try { + ThreadPool pool; // num of cores + pool.start(); + for (unsigned i = 0; i < nums.size(); i++) { + nums[i] = rand() % 1000; + numsResults[i] = nums[i]; + numsSequentialResults[i] = nums[i]; + // this is just a copy to be passed and + // modified, while the original remains + // unchanged + pool.push(std::bind(SumZeroToNumber, std::ref(numsResults[i]))); + } + long num = 10; + std::function task(std::bind(SumZeroToNumber, std::ref(num))); + pool.push(std::move(task)); - std::cout << "End of program reached." << std::endl; - return 0; + // std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + pool.finish(); + } catch (std::exception& ex) { + std::cout << "An exception was thrown while processing data. Exception says: " << ex.what() + << std::endl; + std::exit(1); + } + // test results sequentially + for (unsigned i = 0; i < nums.size(); i++) { + SumZeroToNumber(numsSequentialResults[i]); + if (numsSequentialResults[i] != numsResults[i]) { + std::cout << "failed." << std::endl; + std::cout << "Comparing " << numsSequentialResults[i] << " with " << numsResults[i] + << " failed." << std::endl; + throw std::runtime_error("Results didn't match!"); + } + } + std::cout << "succeeded." << std::endl; + } + + std::cout << "End of program reached." << std::endl; + return 0; }