ThreadPool_pthread/src/ThreadPool_pthread.cpp

118 lines
3.3 KiB
C++

#include "ThreadPool_pthread.h"
#include <string>
#include <stdexcept>
void* _thread_worker(void* pool_obj)
{
ThreadPool_pthread* pool = static_cast<ThreadPool_pthread*>(pool_obj);
while(true)
{
//variable to store a pointer to the function and the parameters
std::pair<ThreadPool_pthread::FunctionType,void*> func_and_args;
LockGuard_pthread guard(pool->mutex);
//if the queue isn't empty, and finish() isn't called, wait for more tasks
while (pool->tasks_queue.empty() && !pool->no_more_to_push)
{
pthread_cond_wait(&pool->task_queue_cond, &pool->mutex);
}
//pull task from queue and execute it if the queue isn't empty
if(!pool->tasks_queue.empty())
{
func_and_args = pool->tasks_queue.front();
pool->tasks_queue.pop();
guard.unlock();
//execute task
func_and_args.first(func_and_args.second);
guard.lock();
}
//if the queue is empty and a "finished" signal was given, exit the
//loop and signal that this thread is finished
if(pool->tasks_queue.empty() && pool->no_more_to_push)
{
pool->threads_running--;
pthread_cond_signal(&pool->thread_finished_cond);
break;
}
}
pthread_exit(NULL);
return NULL;
}
void ThreadPool_pthread::launch_threads()
{
threads_objects.resize(num_of_threads);
int rc;
std::cout << "Launching " << num_of_threads << " threads." << std::endl;
//prevent starting tasks while launching threads
LockGuard_pthread guard(mutex);
for (int i=0; i < num_of_threads; ++i)
{
rc = pthread_create(&threads_objects[i], 0, _thread_worker, static_cast<void*>(this));
if(rc)
{
this->finish();
throw std::runtime_error("Unable to launch threads. Error code " + _to_string(rc));
}
threads_running++;
}
std::cout << "Launching " << num_of_threads << " threads successful." << std::endl;
}
void ThreadPool_pthread::join_all()
{
if(!joined)
{
for(unsigned i = 0; i < threads_objects.size(); i++)
{
pthread_join(threads_objects[i],NULL);
}
joined = true;
}
}
void ThreadPool_pthread::finish()
{
LockGuard_pthread guard(this->mutex);
this->no_more_to_push = true;
pthread_cond_broadcast(&this->task_queue_cond);
while(threads_running > 0)
{
pthread_cond_wait(&this->thread_finished_cond, &this->mutex);
}
guard.unlock();
join_all();
}
ThreadPool_pthread::~ThreadPool_pthread()
{
this->finish();
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&task_queue_cond);
pthread_cond_destroy(&thread_finished_cond);
}
ThreadPool_pthread::ThreadPool_pthread(int threads_count)
{
pthread_mutex_init(&mutex, NULL);
no_more_to_push = false;
threads_running = 0;
joined = true;
pthread_cond_init(&task_queue_cond, 0);
pthread_cond_init(&thread_finished_cond, 0);
num_of_threads = threads_count;
joined = false;
no_more_to_push = false;
launch_threads();
}
void ThreadPool_pthread::push_task(ThreadPool_pthread::FunctionType func, void *params)
{
LockGuard_pthread guard(mutex);
this->tasks_queue.push(std::make_pair(func,params));
pthread_cond_signal(&task_queue_cond);
}