118 lines
3.3 KiB
C++
118 lines
3.3 KiB
C++
#include "ThreadPool_pthread.h"
|
|
#include <string>
|
|
#include <stdexcept>
|
|
|
|
void* _thread_worker(void* pool_obj)
|
|
{
|
|
ThreadPool_pthread* pool = static_cast<ThreadPool_pthread*>(pool_obj);
|
|
while(true)
|
|
{
|
|
//variable to store a pointer to the function and the parameters
|
|
std::pair<ThreadPool_pthread::FunctionType,void*> func_and_args;
|
|
LockGuard_pthread guard(pool->mutex);
|
|
|
|
//if the queue isn't empty, and finish() isn't called, wait for more tasks
|
|
while (pool->tasks_queue.empty() && !pool->no_more_to_push)
|
|
{
|
|
pthread_cond_wait(&pool->task_queue_cond, &pool->mutex);
|
|
}
|
|
//pull task from queue and execute it if the queue isn't empty
|
|
if(!pool->tasks_queue.empty())
|
|
{
|
|
func_and_args = pool->tasks_queue.front();
|
|
pool->tasks_queue.pop();
|
|
|
|
guard.unlock();
|
|
|
|
//execute task
|
|
func_and_args.first(func_and_args.second);
|
|
guard.lock();
|
|
}
|
|
//if the queue is empty and a "finished" signal was given, exit the
|
|
//loop and signal that this thread is finished
|
|
if(pool->tasks_queue.empty() && pool->no_more_to_push)
|
|
{
|
|
pool->threads_running--;
|
|
pthread_cond_signal(&pool->thread_finished_cond);
|
|
break;
|
|
}
|
|
}
|
|
pthread_exit(NULL);
|
|
return NULL;
|
|
}
|
|
|
|
void ThreadPool_pthread::launch_threads()
|
|
{
|
|
threads_objects.resize(num_of_threads);
|
|
int rc;
|
|
std::cout << "Launching " << num_of_threads << " threads." << std::endl;
|
|
|
|
//prevent starting tasks while launching threads
|
|
LockGuard_pthread guard(mutex);
|
|
for (int i=0; i < num_of_threads; ++i)
|
|
{
|
|
rc = pthread_create(&threads_objects[i], 0, _thread_worker, static_cast<void*>(this));
|
|
if(rc)
|
|
{
|
|
this->finish();
|
|
throw std::runtime_error("Unable to launch threads. Error code " + _to_string(rc));
|
|
}
|
|
threads_running++;
|
|
}
|
|
std::cout << "Launching " << num_of_threads << " threads successful." << std::endl;
|
|
}
|
|
|
|
void ThreadPool_pthread::join_all()
|
|
{
|
|
if(!joined)
|
|
{
|
|
for(unsigned i = 0; i < threads_objects.size(); i++)
|
|
{
|
|
pthread_join(threads_objects[i],NULL);
|
|
}
|
|
joined = true;
|
|
}
|
|
}
|
|
|
|
void ThreadPool_pthread::finish()
|
|
{
|
|
LockGuard_pthread guard(this->mutex);
|
|
this->no_more_to_push = true;
|
|
pthread_cond_broadcast(&this->task_queue_cond);
|
|
while(threads_running > 0)
|
|
{
|
|
pthread_cond_wait(&this->thread_finished_cond, &this->mutex);
|
|
}
|
|
guard.unlock();
|
|
join_all();
|
|
}
|
|
|
|
ThreadPool_pthread::~ThreadPool_pthread()
|
|
{
|
|
this->finish();
|
|
pthread_mutex_destroy(&mutex);
|
|
pthread_cond_destroy(&task_queue_cond);
|
|
pthread_cond_destroy(&thread_finished_cond);
|
|
}
|
|
|
|
ThreadPool_pthread::ThreadPool_pthread(int threads_count)
|
|
{
|
|
pthread_mutex_init(&mutex, NULL);
|
|
no_more_to_push = false;
|
|
threads_running = 0;
|
|
joined = true;
|
|
pthread_cond_init(&task_queue_cond, 0);
|
|
pthread_cond_init(&thread_finished_cond, 0);
|
|
num_of_threads = threads_count;
|
|
joined = false;
|
|
no_more_to_push = false;
|
|
launch_threads();
|
|
}
|
|
|
|
void ThreadPool_pthread::push_task(ThreadPool_pthread::FunctionType func, void *params)
|
|
{
|
|
LockGuard_pthread guard(mutex);
|
|
this->tasks_queue.push(std::make_pair(func,params));
|
|
pthread_cond_signal(&task_queue_cond);
|
|
}
|