#include "ThreadPool_pthread.h" #include #include void* _thread_worker(void* pool_obj) { ThreadPool_pthread* pool = static_cast(pool_obj); while(true) { //variable to store a pointer to the function and the parameters std::pair func_and_args; LockGuard_pthread guard(pool->mutex); //if the queue isn't empty, and finish() isn't called, wait for more tasks while (pool->tasks_queue.empty() && !pool->no_more_to_push) { pthread_cond_wait(&pool->task_queue_cond, &pool->mutex); } //pull task from queue and execute it if the queue isn't empty if(!pool->tasks_queue.empty()) { func_and_args = pool->tasks_queue.front(); pool->tasks_queue.pop(); guard.unlock(); //execute task func_and_args.first(func_and_args.second); guard.lock(); } //if the queue is empty and a "finished" signal was given, exit the //loop and signal that this thread is finished if(pool->tasks_queue.empty() && pool->no_more_to_push) { pool->threads_running--; pthread_cond_signal(&pool->thread_finished_cond); break; } } pthread_exit(NULL); return NULL; } void ThreadPool_pthread::launch_threads() { threads_objects.resize(num_of_threads); int rc; std::cout << "Launching " << num_of_threads << " threads." << std::endl; //prevent starting tasks while launching threads LockGuard_pthread guard(mutex); for (int i=0; i < num_of_threads; ++i) { rc = pthread_create(&threads_objects[i], 0, _thread_worker, static_cast(this)); if(rc) { this->finish(); throw std::runtime_error("Unable to launch threads. Error code " + _to_string(rc)); } threads_running++; } std::cout << "Launching " << num_of_threads << " threads successful." << std::endl; } void ThreadPool_pthread::join_all() { if(!joined) { for(unsigned i = 0; i < threads_objects.size(); i++) { pthread_join(threads_objects[i],NULL); } joined = true; } } void ThreadPool_pthread::finish() { LockGuard_pthread guard(this->mutex); this->no_more_to_push = true; pthread_cond_broadcast(&this->task_queue_cond); while(threads_running > 0) { pthread_cond_wait(&this->thread_finished_cond, &this->mutex); } guard.unlock(); join_all(); } ThreadPool_pthread::~ThreadPool_pthread() { this->finish(); pthread_mutex_destroy(&mutex); pthread_cond_destroy(&task_queue_cond); pthread_cond_destroy(&thread_finished_cond); } ThreadPool_pthread::ThreadPool_pthread(int threads_count) { pthread_mutex_init(&mutex, NULL); no_more_to_push = false; threads_running = 0; joined = true; pthread_cond_init(&task_queue_cond, 0); pthread_cond_init(&thread_finished_cond, 0); num_of_threads = threads_count; joined = false; no_more_to_push = false; launch_threads(); } void ThreadPool_pthread::push_task(ThreadPool_pthread::FunctionType func, void *params) { LockGuard_pthread guard(mutex); this->tasks_queue.push(std::make_pair(func,params)); pthread_cond_signal(&task_queue_cond); }