From 1a56e9bc1d52d8bd2600226720964d68b35232f6 Mon Sep 17 00:00:00 2001 From: Samer Afach Date: Sun, 15 Jan 2017 11:16:00 +0100 Subject: [PATCH] First commit --- Makefile | 26 ++++++++ Threadpool_pthread.pro | 27 ++++++++ examples/sumnums.cpp | 37 +++++++++++ include/LockGuard_pthread.h | 28 +++++++++ include/ThreadPool_pthread.h | 89 ++++++++++++++++++++++++++ src/LockGuard_pthread.cpp | 29 +++++++++ src/ThreadPool_pthread.cpp | 117 +++++++++++++++++++++++++++++++++++ 7 files changed, 353 insertions(+) create mode 100644 Makefile create mode 100644 Threadpool_pthread.pro create mode 100644 examples/sumnums.cpp create mode 100644 include/LockGuard_pthread.h create mode 100644 include/ThreadPool_pthread.h create mode 100644 src/LockGuard_pthread.cpp create mode 100644 src/ThreadPool_pthread.cpp diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..2747131 --- /dev/null +++ b/Makefile @@ -0,0 +1,26 @@ +CXX = g++ +INC_DIR = include +OBJ_DIR = object +SRC_DIR = src +EXE_DIR = bin +CFLAGS = -O3 -ffast-math -std=c++98 -pedantic -pthread +TEST_FILE = $(EXE_DIR)/SumNums +TARGET = $(EXE_DIR)/SumNums +MKDIR_P = mkdir -p + +SRC = $(SRC_DIR)/ThreadPool_pthread.cpp $(SRC_DIR)/LockGuard_pthread.cpp examples/sumnums.cpp +OBJ = $(OBJ_DIR)/ThreadPool_pthread.o $(OBJ_DIR)/LockGuard_pthread.o $(OBJ_DIR)/sumnums.o +DEP = $(INC_DIR)/ThreadPool_pthread.h $(INC_DIR)/LockGuard_pthread.h + +all: $(TARGET) + +$(OBJ): $(SRC) $(DEP) + $(MKDIR_P) $(@D) + $(CXX) $(CFLAGS) -I$(INC_DIR) -c $< -o $@ + +$(TARGET): $(OBJ) $(DEP) + $(MKDIR_P) $(@D) + $(CXX) $(CFLAGS) -o $@ $(SRC) -I$(INC_DIR) + +clean: + $(RM) $(OBJ_DIR)/*.o $(TARGET) $(TEST_FILE) diff --git a/Threadpool_pthread.pro b/Threadpool_pthread.pro new file mode 100644 index 0000000..db9303b --- /dev/null +++ b/Threadpool_pthread.pro @@ -0,0 +1,27 @@ +QT -= core +QT -= gui + +CONFIG += c++98 + +TARGET = Threadpool_pthread +CONFIG += console +CONFIG -= app_bundle +CONFIG -= qt + +TEMPLATE = app + +SOURCES += \ + src/LockGuard_pthread.cpp \ + src/ThreadPool_pthread.cpp \ + examples/sumnums.cpp + +HEADERS += \ + include/LockGuard_pthread.h \ + include/ThreadPool_pthread.h + +OTHER_FILES += Makefile + +INCLUDEPATH += include + +LIBS += -pthread + diff --git a/examples/sumnums.cpp b/examples/sumnums.cpp new file mode 100644 index 0000000..cf24daa --- /dev/null +++ b/examples/sumnums.cpp @@ -0,0 +1,37 @@ +#include "../include/ThreadPool_pthread.h" +#include +#include + +void* SumZeroToNumber(void* num) +{ + long val = 0; + for(long i = 0; i <= *static_cast(num); i++) { + val += i; + } + *static_cast(num) = val; + return NULL; +} +int main() +{ + std::vector nums(10000); + std::vector numsResults(10000); + try + { + ThreadPool_pthread pool(8); //num of cores + for(unsigned i = 0; i < nums.size(); i++) { + nums[i] = rand() % 1000; + numsResults[i] = nums[i]; //this is just a copy to be passed and modified, while the original remains unchanged + pool.push_task(SumZeroToNumber,static_cast(&numsResults[i])); + } + pool.finish(); + } + catch(std::exception &ex) + { + std::cout<<"An exception was thrown while processing data. Exception says: " << ex.what() << std::endl; + std::exit(1); + } + for (unsigned int i = 0; i < nums.size(); ++i) { + std::cout<<"Sum from 0 to " << nums[i] << " is: " << numsResults[i] < +#include +#include +#include +#include +#include + +#include "pthread.h" + +#include "LockGuard_pthread.h" + +/** + * @brief The ThreadPool class + * A basic thread pool implementation using pthread + * + * Call with the constructor ThreadPool::ThreadPool(N) to get N threads. + * The default constructor will detect the number of available cores/core threads and use it. + * Constructor throws std::runtime_error() on failure to spawn processes. + * + * + * Add tasks using ThreadPool::push_task(function, args). More info at the function description. + * + * Use ThreadPool::finish() to wait for all tasks to process and join all threads. + * + * + * + * + */ +class ThreadPool_pthread +{ + pthread_cond_t task_queue_cond; //signaled when a new task is pushed + pthread_cond_t thread_finished_cond; //signaled when a thread finishes and no more is there to do + pthread_mutex_t mutex; + bool no_more_to_push; + + typedef void*(*FunctionType)(void*); + + //queue of tasks as an std::queue of std::pair of functions and args + std::queue > tasks_queue; + + + void launch_threads(); + int num_of_threads; //stores the number of threads requested + std::vector threads_objects; + friend void* _thread_worker(void*); + bool joined; //prevents rejoining twice, true when all threads are joined + long threads_running; //keeps track of threads that are executing tasks + void join_all(); + +public: + ThreadPool_pthread(int threads_count); + virtual ~ThreadPool_pthread(); + + /** + * @brief push_task + * Adds a task to the processing queue + * + * Results are passed through parameters too. No return value is considered. + * + * @param function + * Function pointer to the function to be executed. + * The function is of type void*(void*) + * All parameters should be passed through a single void pointer + * + * @param params + * Parameters of the function as void*. + */ + void push_task(FunctionType function, void* params); + + /** + * @brief finish + * Waits for all threads to finish and joins them. + */ + void finish(); +}; + +template +std::string _to_string(T const& value) +{ + std::stringstream sstr; + + sstr << value; + return sstr.str(); +} + +#endif // THREADPOOL_H diff --git a/src/LockGuard_pthread.cpp b/src/LockGuard_pthread.cpp new file mode 100644 index 0000000..8e1fef4 --- /dev/null +++ b/src/LockGuard_pthread.cpp @@ -0,0 +1,29 @@ +#include "LockGuard_pthread.h" + +LockGuard_pthread::LockGuard_pthread(pthread_mutex_t &LockRef) : _lock(LockRef), is_locked(false) +{ + lock(); +} + +LockGuard_pthread::~LockGuard_pthread() +{ + unlock(); +} + +void LockGuard_pthread::lock() +{ + if(!is_locked) + { + pthread_mutex_lock(&_lock); + is_locked = true; + } +} + +void LockGuard_pthread::unlock() +{ + if(is_locked) + { + pthread_mutex_unlock(&_lock); + is_locked = false; + } +} diff --git a/src/ThreadPool_pthread.cpp b/src/ThreadPool_pthread.cpp new file mode 100644 index 0000000..f26afc2 --- /dev/null +++ b/src/ThreadPool_pthread.cpp @@ -0,0 +1,117 @@ +#include "ThreadPool_pthread.h" +#include +#include + +void* _thread_worker(void* pool_obj) +{ + ThreadPool_pthread* pool = static_cast(pool_obj); + while(true) + { + //variable to store a pointer to the function and the parameters + std::pair func_and_args; + LockGuard_pthread guard(pool->mutex); + + //if the queue isn't empty, and finish() isn't called, wait for more tasks + while (pool->tasks_queue.empty() && !pool->no_more_to_push) + { + pthread_cond_wait(&pool->task_queue_cond, &pool->mutex); + } + //pull task from queue and execute it if the queue isn't empty + if(!pool->tasks_queue.empty()) + { + func_and_args = pool->tasks_queue.front(); + pool->tasks_queue.pop(); + + guard.unlock(); + + //execute task + func_and_args.first(func_and_args.second); + guard.lock(); + } + //if the queue is empty and a "finished" signal was given, exit the + //loop and signal that this thread is finished + if(pool->tasks_queue.empty() && pool->no_more_to_push) + { + pool->threads_running--; + pthread_cond_signal(&pool->thread_finished_cond); + break; + } + } + pthread_exit(NULL); + return NULL; +} + +void ThreadPool_pthread::launch_threads() +{ + threads_objects.resize(num_of_threads); + int rc; + std::cout << "Launching " << num_of_threads << " threads." << std::endl; + + //prevent starting tasks while launching threads + LockGuard_pthread guard(mutex); + for (int i=0; i < num_of_threads; ++i) + { + rc = pthread_create(&threads_objects[i], 0, _thread_worker, static_cast(this)); + if(rc) + { + this->finish(); + throw std::runtime_error("Unable to launch threads. Error code " + _to_string(rc)); + } + threads_running++; + } + std::cout << "Launching " << num_of_threads << " threads successful." << std::endl; +} + +void ThreadPool_pthread::join_all() +{ + if(!joined) + { + for(unsigned i = 0; i < threads_objects.size(); i++) + { + pthread_join(threads_objects[i],NULL); + } + joined = true; + } +} + +void ThreadPool_pthread::finish() +{ + LockGuard_pthread guard(this->mutex); + this->no_more_to_push = true; + pthread_cond_broadcast(&this->task_queue_cond); + while(threads_running > 0) + { + pthread_cond_wait(&this->thread_finished_cond, &this->mutex); + } + guard.unlock(); + join_all(); +} + +ThreadPool_pthread::~ThreadPool_pthread() +{ + this->finish(); + pthread_mutex_destroy(&mutex); + pthread_cond_destroy(&task_queue_cond); + pthread_cond_destroy(&thread_finished_cond); +} + +ThreadPool_pthread::ThreadPool_pthread(int threads_count) +{ + pthread_mutex_init(&mutex, NULL); + no_more_to_push = false; + threads_running = 0; + joined = true; + pthread_cond_init(&task_queue_cond, 0); + pthread_cond_init(&thread_finished_cond, 0); + num_of_threads = threads_count; + joined = false; + no_more_to_push = false; + launch_threads(); +} + +void ThreadPool_pthread::push_task(ThreadPool_pthread::FunctionType func, void *params) +{ + LockGuard_pthread guard(mutex); + this->tasks_queue.push(std::make_pair(func,params)); + pthread_cond_signal(&task_queue_cond); +}