From 20f6a9a8fd377e21be7c1bfae6379cd03c3c9fe7 Mon Sep 17 00:00:00 2001 From: Samer Afach Date: Tue, 6 Jun 2017 19:02:45 +0200 Subject: [PATCH] First commit with a tested version of the thread pool --- .gitignore | 73 +++++++++++++++++++++++++++++++ CMakeLists.txt | 8 ++++ include/ThreadPool.cpp | 1 + include/ThreadPool.h | 98 ++++++++++++++++++++++++++++++++++++++++++ main.cpp | 55 ++++++++++++++++++++++++ 5 files changed, 235 insertions(+) create mode 100644 .gitignore create mode 100644 CMakeLists.txt create mode 100644 include/ThreadPool.cpp create mode 100644 include/ThreadPool.h create mode 100644 main.cpp diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fab7372 --- /dev/null +++ b/.gitignore @@ -0,0 +1,73 @@ +# This file is used to ignore files which are generated +# ---------------------------------------------------------------------------- + +*~ +*.autosave +*.a +*.core +*.moc +*.o +*.obj +*.orig +*.rej +*.so +*.so.* +*_pch.h.cpp +*_resource.rc +*.qm +.#* +*.*# +core +!core/ +tags +.DS_Store +.directory +*.debug +Makefile* +*.prl +*.app +moc_*.cpp +ui_*.h +qrc_*.cpp +Thumbs.db +*.res +*.rc +/.qmake.cache +/.qmake.stash + +# qtcreator generated files +*.pro.user* + +# xemacs temporary files +*.flc + +# Vim temporary files +.*.swp + +# Visual Studio generated files +*.ib_pdb_index +*.idb +*.ilk +*.pdb +*.sln +*.suo +*.vcproj +*vcproj.*.*.user +*.ncb +*.sdf +*.opensdf +*.vcxproj +*vcxproj.* + +# MinGW generated files +*.Debug +*.Release + +# Python byte code +*.pyc + +# Binaries +# -------- +*.dll +*.exe + diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..dece3ed --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,8 @@ +cmake_minimum_required(VERSION 2.8) + +project(ThreadPool) +add_library(threadpool_lib "src/ThreadPool.cpp") + +add_executable(${PROJECT_NAME} "main.cpp") + +target_link_libraries(${PROJECT_NAME} threadpool_lib) diff --git a/include/ThreadPool.cpp b/include/ThreadPool.cpp new file mode 100644 index 0000000..f429d68 --- /dev/null +++ b/include/ThreadPool.cpp @@ -0,0 +1 @@ +#include "ThreadPool.h" diff --git a/include/ThreadPool.h b/include/ThreadPool.h new file mode 100644 index 0000000..fe40eaf --- /dev/null +++ b/include/ThreadPool.h @@ -0,0 +1,98 @@ +#ifndef THREADPOOL_H +#define THREADPOOL_H + +#include +#include +#include +#include +#include +#include + +class ThreadPool { + int numOfThreads; + std::deque> _tasks; + std::mutex _queueLock; + std::atomic stopped{false}; + bool startedAlready = false; + std::condition_variable _queueCond; + std::condition_variable _threadFinishedCond; + std::atomic_int numOfThreadsRunning{0}; + std::vector _threads; + +protected: + inline void threadWorker(); + inline void joinAll(); + +public: + inline ThreadPool(); + inline ~ThreadPool(); + inline void push(const std::function &task); + inline void + start(const int NumOfThreads = std::thread::hardware_concurrency()); + inline void finish(); +}; + +void ThreadPool::threadWorker() { + while (true) { + std::unique_lock lg(_queueLock); + while (_tasks.empty() && !stopped.load()) { + _queueCond.wait(lg); + } + if (!_tasks.empty()) { + auto theTask = _tasks[0]; + _tasks.pop_front(); + lg.unlock(); + theTask(); + lg.lock(); + } + if (_tasks.empty() && stopped.load()) { + numOfThreadsRunning--; + _threadFinishedCond.notify_one(); + break; + } + } +} + +void ThreadPool::joinAll() { + for (long i = 0; i < numOfThreads; i++) { + if (_threads[i].joinable()) + _threads[i].join(); + } +} + +ThreadPool::ThreadPool() {} + +ThreadPool::~ThreadPool() { this->finish(); } + +void ThreadPool::push(const std::function &task) { + std::unique_lock lg(_queueLock); + _tasks.push_back(task); + lg.unlock(); + _queueCond.notify_one(); +} + +void ThreadPool::start(const int NumOfThreads) { + if (!startedAlready) + startedAlready = true; + else + throw std::logic_error("You cannot start the thread pool multiple times."); + numOfThreads = NumOfThreads; + _threads.reserve(numOfThreads); + for (long i = 0; i < numOfThreads; i++) { + _threads.push_back(std::thread(std::bind(&ThreadPool::threadWorker, this))); + numOfThreadsRunning++; + } +} + +void ThreadPool::finish() { + std::unique_lock lg(_queueLock); + stopped.store(true); + _queueCond.notify_all(); + while (numOfThreadsRunning > 0) { + _threadFinishedCond.wait(lg); + } + lg.unlock(); + joinAll(); +} + +#endif // THREADPOOL_H diff --git a/main.cpp b/main.cpp new file mode 100644 index 0000000..a6c574d --- /dev/null +++ b/main.cpp @@ -0,0 +1,55 @@ +#include + +#include "src/ThreadPool.h" + +void SumZeroToNumber(long &num) { + long val = 0; + for (long i = 0; i <= static_cast(num); i++) { + val += i; + } + static_cast(num) = val; +} + +int main() { + for (int tries = 0; tries < 10; tries++) { + std::cout << "Try number: " << tries + 1 << "... "; + + std::vector nums(100000); + std::vector numsResults(nums.size()); + std::vector numsSequentialResults(nums.size()); + try { + ThreadPool pool; // num of cores + pool.start(); + for (unsigned i = 0; i < nums.size(); i++) { + nums[i] = rand() % 1000; + numsResults[i] = nums[i]; + numsSequentialResults[i] = nums[i]; + // this is just a copy to be passed and + // modified, while the original remains + // unchanged + pool.push(std::bind(SumZeroToNumber, std::ref(numsResults[i]))); + } + // std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + pool.finish(); + } catch (std::exception &ex) { + std::cout + << "An exception was thrown while processing data. Exception says: " + << ex.what() << std::endl; + std::exit(1); + } + // test results sequentially + for (unsigned i = 0; i < nums.size(); i++) { + SumZeroToNumber(numsSequentialResults[i]); + if (numsSequentialResults[i] != numsResults[i]) { + std::cout << "failed." << std::endl; + std::cout << "Comparing " << numsSequentialResults[i] << " with " + << numsResults[i] << " failed." << std::endl; + throw std::runtime_error("Results didn't match!"); + } + } + std::cout << "succeeded." << std::endl; + } + + std::cout << "End of program reached." << std::endl; + return 0; +}