Some reformatting and changed packaged_task to function

This commit is contained in:
Samer Afach 2019-06-17 14:38:29 +00:00
parent 6511898193
commit 11332c8cf3
3 changed files with 199 additions and 188 deletions

146
.gitignore vendored
View File

@ -1,73 +1,73 @@
# This file is used to ignore files which are generated # This file is used to ignore files which are generated
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
*~ *~
*.autosave *.autosave
*.a *.a
*.core *.core
*.moc *.moc
*.o *.o
*.obj *.obj
*.orig *.orig
*.rej *.rej
*.so *.so
*.so.* *.so.*
*_pch.h.cpp *_pch.h.cpp
*_resource.rc *_resource.rc
*.qm *.qm
.#* .#*
*.*# *.*#
core core
!core/ !core/
tags tags
.DS_Store .DS_Store
.directory .directory
*.debug *.debug
Makefile* Makefile*
*.prl *.prl
*.app *.app
moc_*.cpp moc_*.cpp
ui_*.h ui_*.h
qrc_*.cpp qrc_*.cpp
Thumbs.db Thumbs.db
*.res *.res
*.rc *.rc
/.qmake.cache /.qmake.cache
/.qmake.stash /.qmake.stash
# qtcreator generated files # qtcreator generated files
*.pro.user* *.pro.user*
# xemacs temporary files # xemacs temporary files
*.flc *.flc
# Vim temporary files # Vim temporary files
.*.swp .*.swp
# Visual Studio generated files # Visual Studio generated files
*.ib_pdb_index *.ib_pdb_index
*.idb *.idb
*.ilk *.ilk
*.pdb *.pdb
*.sln *.sln
*.suo *.suo
*.vcproj *.vcproj
*vcproj.*.*.user *vcproj.*.*.user
*.ncb *.ncb
*.sdf *.sdf
*.opensdf *.opensdf
*.vcxproj *.vcxproj
*vcxproj.* *vcxproj.*
# MinGW generated files # MinGW generated files
*.Debug *.Debug
*.Release *.Release
# Python byte code # Python byte code
*.pyc *.pyc
# Binaries # Binaries
# -------- # --------
*.dll *.dll
*.exe *.exe

View File

@ -14,105 +14,110 @@ it. Using this library is your own responsibility
#ifndef THREADPOOL_H #ifndef THREADPOOL_H
#define THREADPOOL_H #define THREADPOOL_H
#include <condition_variable>
#include <deque> #include <deque>
#include <functional> #include <functional>
#include <mutex> #include <mutex>
#include <thread> #include <thread>
#include <vector> #include <vector>
#include <condition_variable>
class ThreadPool { class ThreadPool
long numOfThreads; {
std::deque<std::function<void()>> _tasks; long numOfThreads;
std::mutex _queueLock; std::deque<std::function<void()>> _tasks;
bool conclude_work = false; std::mutex _queueLock;
bool started_already = false; bool conclude_work = false;
std::condition_variable _queueCond; bool started_already = false;
std::condition_variable _threadFinishedCond; std::condition_variable _queueCond;
long num_of_threads_running = 0; std::condition_variable _threadFinishedCond;
std::vector<std::thread> _threads; long num_of_threads_running = 0;
std::vector<std::thread> _threads;
protected: protected:
inline void thread_worker(); inline void thread_worker();
inline void join_all(); inline void join_all();
public: public:
inline ThreadPool(); inline ThreadPool();
inline ~ThreadPool(); inline ~ThreadPool();
inline void push(const std::function<void()> &task); inline void push(const std::function<void()>& task);
inline void push(std::function<void ()> &&task); inline void push(std::function<void()>&& task);
inline void inline void start(const long NumOfThreads = std::thread::hardware_concurrency());
start(const long NumOfThreads = std::thread::hardware_concurrency()); inline void finish();
inline void finish();
}; };
void ThreadPool::thread_worker() { void ThreadPool::thread_worker()
while (true) { {
std::unique_lock<decltype(_queueLock)> lg(_queueLock); while (true) {
while (_tasks.empty() && !conclude_work) { std::unique_lock<decltype(_queueLock)> lg(_queueLock);
_queueCond.wait(lg); while (_tasks.empty() && !conclude_work) {
_queueCond.wait(lg);
}
if (!_tasks.empty()) {
auto theTask = _tasks[0];
_tasks.pop_front();
lg.unlock();
theTask();
lg.lock();
}
if (_tasks.empty() && conclude_work) {
num_of_threads_running--;
_threadFinishedCond.notify_one();
break;
}
} }
if (!_tasks.empty()) {
auto theTask = _tasks[0];
_tasks.pop_front();
lg.unlock();
theTask();
lg.lock();
}
if (_tasks.empty() && conclude_work) {
num_of_threads_running--;
_threadFinishedCond.notify_one();
break;
}
}
} }
void ThreadPool::join_all() { void ThreadPool::join_all()
for (long i = 0; i < static_cast<long>(_threads.size()); i++) { {
if (_threads[i].joinable()) for (long i = 0; i < static_cast<long>(_threads.size()); i++) {
_threads[i].join(); if (_threads[i].joinable())
} _threads[i].join();
}
} }
ThreadPool::ThreadPool() {} ThreadPool::ThreadPool() {}
ThreadPool::~ThreadPool() { this->finish(); } ThreadPool::~ThreadPool() { this->finish(); }
void ThreadPool::push(const std::function<void()> &task) { void ThreadPool::push(const std::function<void()>& task)
std::unique_lock<decltype(_queueLock)> lg(_queueLock); {
_tasks.push_back(task); std::unique_lock<decltype(_queueLock)> lg(_queueLock);
_queueCond.notify_one(); _tasks.push_back(task);
_queueCond.notify_one();
} }
void ThreadPool::push(std::function<void ()>&& task) { void ThreadPool::push(std::function<void()>&& task)
{
std::unique_lock<decltype(_queueLock)> lg(_queueLock); std::unique_lock<decltype(_queueLock)> lg(_queueLock);
_tasks.push_back(std::move(task)); _tasks.push_back(std::move(task));
_queueCond.notify_one(); _queueCond.notify_one();
} }
void ThreadPool::start(const long NumOfThreads) { void ThreadPool::start(const long NumOfThreads)
if (!started_already) {
started_already = true; if (!started_already)
else started_already = true;
throw std::logic_error("You cannot start the thread pool multiple times."); else
numOfThreads = NumOfThreads; throw std::logic_error("You cannot start the thread pool multiple times.");
_threads.reserve(numOfThreads); numOfThreads = NumOfThreads;
for (long i = 0; i < numOfThreads; i++) { _threads.reserve(numOfThreads);
_threads.push_back( for (long i = 0; i < numOfThreads; i++) {
std::thread(std::bind(&ThreadPool::thread_worker, this))); _threads.push_back(std::thread(std::bind(&ThreadPool::thread_worker, this)));
num_of_threads_running++; num_of_threads_running++;
} }
} }
void ThreadPool::finish() { void ThreadPool::finish()
std::unique_lock<decltype(_queueLock)> lg(_queueLock); {
conclude_work = true; std::unique_lock<decltype(_queueLock)> lg(_queueLock);
_queueCond.notify_all(); conclude_work = true;
while (num_of_threads_running > 0) { _queueCond.notify_all();
_threadFinishedCond.wait(lg); while (num_of_threads_running > 0) {
} _threadFinishedCond.wait(lg);
lg.unlock(); }
join_all(); lg.unlock();
join_all();
} }
#endif // THREADPOOL_H #endif // THREADPOOL_H

View File

@ -1,55 +1,61 @@
#include <future>
#include <iostream> #include <iostream>
#include "include/ThreadPool.h" #include "include/ThreadPool.h"
void SumZeroToNumber(long &num) { void SumZeroToNumber(long& num)
long val = 0; {
for (long i = 0; i <= static_cast<long>(num); i++) { long val = 0;
val += i; for (long i = 0; i <= static_cast<long>(num); i++) {
} val += i;
num = val; }
num = val;
} }
int main() { int main()
for (int tries = 0; tries < 10; tries++) { {
std::cout << "Try number: " << tries + 1 << "... "; for (int tries = 0; tries < 10; tries++) {
std::cout << "Try number: " << tries + 1 << "... ";
std::vector<long> nums(100000); std::vector<long> nums(100000);
std::vector<long> numsResults(nums.size()); std::vector<long> numsResults(nums.size());
std::vector<long> numsSequentialResults(nums.size()); std::vector<long> numsSequentialResults(nums.size());
try { try {
ThreadPool pool; // num of cores ThreadPool pool; // num of cores
pool.start(); pool.start();
for (unsigned i = 0; i < nums.size(); i++) { for (unsigned i = 0; i < nums.size(); i++) {
nums[i] = rand() % 1000; nums[i] = rand() % 1000;
numsResults[i] = nums[i]; numsResults[i] = nums[i];
numsSequentialResults[i] = nums[i]; numsSequentialResults[i] = nums[i];
// this is just a copy to be passed and // this is just a copy to be passed and
// modified, while the original remains // modified, while the original remains
// unchanged // unchanged
pool.push(std::bind(SumZeroToNumber, std::ref(numsResults[i]))); pool.push(std::bind(SumZeroToNumber, std::ref(numsResults[i])));
} }
// std::this_thread::sleep_for(std::chrono::milliseconds(1000)); long num = 10;
pool.finish(); std::function<void()> task(std::bind(SumZeroToNumber, std::ref(num)));
} catch (std::exception &ex) { pool.push(std::move(task));
std::cout
<< "An exception was thrown while processing data. Exception says: "
<< ex.what() << std::endl;
std::exit(1);
}
// test results sequentially
for (unsigned i = 0; i < nums.size(); i++) {
SumZeroToNumber(numsSequentialResults[i]);
if (numsSequentialResults[i] != numsResults[i]) {
std::cout << "failed." << std::endl;
std::cout << "Comparing " << numsSequentialResults[i] << " with "
<< numsResults[i] << " failed." << std::endl;
throw std::runtime_error("Results didn't match!");
}
}
std::cout << "succeeded." << std::endl;
}
std::cout << "End of program reached." << std::endl; // std::this_thread::sleep_for(std::chrono::milliseconds(1000));
return 0; pool.finish();
} catch (std::exception& ex) {
std::cout << "An exception was thrown while processing data. Exception says: " << ex.what()
<< std::endl;
std::exit(1);
}
// test results sequentially
for (unsigned i = 0; i < nums.size(); i++) {
SumZeroToNumber(numsSequentialResults[i]);
if (numsSequentialResults[i] != numsResults[i]) {
std::cout << "failed." << std::endl;
std::cout << "Comparing " << numsSequentialResults[i] << " with " << numsResults[i]
<< " failed." << std::endl;
throw std::runtime_error("Results didn't match!");
}
}
std::cout << "succeeded." << std::endl;
}
std::cout << "End of program reached." << std::endl;
return 0;
} }