Commit 1a56e9bc authored by Samer Afach's avatar Samer Afach
Browse files

First commit

parents
CXX = g++
INC_DIR = include
OBJ_DIR = object
SRC_DIR = src
EXE_DIR = bin
CFLAGS = -O3 -ffast-math -std=c++98 -pedantic -pthread
TEST_FILE = $(EXE_DIR)/SumNums
TARGET = $(EXE_DIR)/SumNums
MKDIR_P = mkdir -p
SRC = $(SRC_DIR)/ThreadPool_pthread.cpp $(SRC_DIR)/LockGuard_pthread.cpp examples/sumnums.cpp
OBJ = $(OBJ_DIR)/ThreadPool_pthread.o $(OBJ_DIR)/LockGuard_pthread.o $(OBJ_DIR)/sumnums.o
DEP = $(INC_DIR)/ThreadPool_pthread.h $(INC_DIR)/LockGuard_pthread.h
all: $(TARGET)
$(OBJ): $(SRC) $(DEP)
$(MKDIR_P) $(@D)
$(CXX) $(CFLAGS) -I$(INC_DIR) -c $< -o $@
$(TARGET): $(OBJ) $(DEP)
$(MKDIR_P) $(@D)
$(CXX) $(CFLAGS) -o $@ $(SRC) -I$(INC_DIR)
clean:
$(RM) $(OBJ_DIR)/*.o $(TARGET) $(TEST_FILE)
QT -= core
QT -= gui
CONFIG += c++98
TARGET = Threadpool_pthread
CONFIG += console
CONFIG -= app_bundle
CONFIG -= qt
TEMPLATE = app
SOURCES += \
src/LockGuard_pthread.cpp \
src/ThreadPool_pthread.cpp \
examples/sumnums.cpp
HEADERS += \
include/LockGuard_pthread.h \
include/ThreadPool_pthread.h
OTHER_FILES += Makefile
INCLUDEPATH += include
LIBS += -pthread
#include "../include/ThreadPool_pthread.h"
#include <iostream>
#include <cstdlib>
void* SumZeroToNumber(void* num)
{
long val = 0;
for(long i = 0; i <= *static_cast<long*>(num); i++) {
val += i;
}
*static_cast<long*>(num) = val;
return NULL;
}
int main()
{
std::vector<long> nums(10000);
std::vector<long> numsResults(10000);
try
{
ThreadPool_pthread pool(8); //num of cores
for(unsigned i = 0; i < nums.size(); i++) {
nums[i] = rand() % 1000;
numsResults[i] = nums[i]; //this is just a copy to be passed and modified, while the original remains unchanged
pool.push_task(SumZeroToNumber,static_cast<void*>(&numsResults[i]));
}
pool.finish();
}
catch(std::exception &ex)
{
std::cout<<"An exception was thrown while processing data. Exception says: " << ex.what() << std::endl;
std::exit(1);
}
for (unsigned int i = 0; i < nums.size(); ++i) {
std::cout<<"Sum from 0 to " << nums[i] << " is: " << numsResults[i] <<std::endl;
}
return 0;
}
#ifndef LOCKGUARD_H
#define LOCKGUARD_H
#include "pthread.h"
/**
* @brief The LockGuard class
* Protects against deadlock by forcing unlocks when going out of scope
*
* The mechanism is quite simple. Constructor locks, destructor unlocks,
* and custom locks and unlocks are possible
*/
class LockGuard_pthread
{
pthread_mutex_t& _lock;
bool is_locked;
public:
LockGuard_pthread(pthread_mutex_t& LockRef);
~LockGuard_pthread();
void lock();
void unlock();
private:
LockGuard_pthread(LockGuard_pthread const &);
void operator=(LockGuard_pthread &);
};
#endif // LOCKGUARD_H
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <queue>
#include <vector>
#include <functional>
#include <string>
#include <iostream>
#include <sstream>
#include "pthread.h"
#include "LockGuard_pthread.h"
/**
* @brief The ThreadPool class
* A basic thread pool implementation using pthread
*
* Call with the constructor ThreadPool::ThreadPool(N) to get N threads.
* The default constructor will detect the number of available cores/core threads and use it.
* Constructor throws std::runtime_error() on failure to spawn processes.
*
*
* Add tasks using ThreadPool::push_task(function, args). More info at the function description.
*
* Use ThreadPool::finish() to wait for all tasks to process and join all threads.
*
*
*
*
*/
class ThreadPool_pthread
{
pthread_cond_t task_queue_cond; //signaled when a new task is pushed
pthread_cond_t thread_finished_cond; //signaled when a thread finishes and no more is there to do
pthread_mutex_t mutex;
bool no_more_to_push;
typedef void*(*FunctionType)(void*);
//queue of tasks as an std::queue of std::pair of functions and args
std::queue<std::pair<FunctionType,void*> > tasks_queue;
void launch_threads();
int num_of_threads; //stores the number of threads requested
std::vector<pthread_t> threads_objects;
friend void* _thread_worker(void*);
bool joined; //prevents rejoining twice, true when all threads are joined
long threads_running; //keeps track of threads that are executing tasks
void join_all();
public:
ThreadPool_pthread(int threads_count);
virtual ~ThreadPool_pthread();
/**
* @brief push_task
* Adds a task to the processing queue
*
* Results are passed through parameters too. No return value is considered.
*
* @param function
* Function pointer to the function to be executed.
* The function is of type void*(void*)
* All parameters should be passed through a single void pointer
*
* @param params
* Parameters of the function as void*.
*/
void push_task(FunctionType function, void* params);
/**
* @brief finish
* Waits for all threads to finish and joins them.
*/
void finish();
};
template <typename T>
std::string _to_string(T const& value)
{
std::stringstream sstr;
sstr << value;
return sstr.str();
}
#endif // THREADPOOL_H
#include "LockGuard_pthread.h"
LockGuard_pthread::LockGuard_pthread(pthread_mutex_t &LockRef) : _lock(LockRef), is_locked(false)
{
lock();
}
LockGuard_pthread::~LockGuard_pthread()
{
unlock();
}
void LockGuard_pthread::lock()
{
if(!is_locked)
{
pthread_mutex_lock(&_lock);
is_locked = true;
}
}
void LockGuard_pthread::unlock()
{
if(is_locked)
{
pthread_mutex_unlock(&_lock);
is_locked = false;
}
}
#include "ThreadPool_pthread.h"
#include <string>
#include <stdexcept>
void* _thread_worker(void* pool_obj)
{
ThreadPool_pthread* pool = static_cast<ThreadPool_pthread*>(pool_obj);
while(true)
{
//variable to store a pointer to the function and the parameters
std::pair<ThreadPool_pthread::FunctionType,void*> func_and_args;
LockGuard_pthread guard(pool->mutex);
//if the queue isn't empty, and finish() isn't called, wait for more tasks
while (pool->tasks_queue.empty() && !pool->no_more_to_push)
{
pthread_cond_wait(&pool->task_queue_cond, &pool->mutex);
}
//pull task from queue and execute it if the queue isn't empty
if(!pool->tasks_queue.empty())
{
func_and_args = pool->tasks_queue.front();
pool->tasks_queue.pop();
guard.unlock();
//execute task
func_and_args.first(func_and_args.second);
guard.lock();
}
//if the queue is empty and a "finished" signal was given, exit the
//loop and signal that this thread is finished
if(pool->tasks_queue.empty() && pool->no_more_to_push)
{
pool->threads_running--;
pthread_cond_signal(&pool->thread_finished_cond);
break;
}
}
pthread_exit(NULL);
return NULL;
}
void ThreadPool_pthread::launch_threads()
{
threads_objects.resize(num_of_threads);
int rc;
std::cout << "Launching " << num_of_threads << " threads." << std::endl;
//prevent starting tasks while launching threads
LockGuard_pthread guard(mutex);
for (int i=0; i < num_of_threads; ++i)
{
rc = pthread_create(&threads_objects[i], 0, _thread_worker, static_cast<void*>(this));
if(rc)
{
this->finish();
throw std::runtime_error("Unable to launch threads. Error code " + _to_string(rc));
}
threads_running++;
}
std::cout << "Launching " << num_of_threads << " threads successful." << std::endl;
}
void ThreadPool_pthread::join_all()
{
if(!joined)
{
for(unsigned i = 0; i < threads_objects.size(); i++)
{
pthread_join(threads_objects[i],NULL);
}
joined = true;
}
}
void ThreadPool_pthread::finish()
{
LockGuard_pthread guard(this->mutex);
this->no_more_to_push = true;
pthread_cond_broadcast(&this->task_queue_cond);
while(threads_running > 0)
{
pthread_cond_wait(&this->thread_finished_cond, &this->mutex);
}
guard.unlock();
join_all();
}
ThreadPool_pthread::~ThreadPool_pthread()
{
this->finish();
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&task_queue_cond);
pthread_cond_destroy(&thread_finished_cond);
}
ThreadPool_pthread::ThreadPool_pthread(int threads_count)
{
pthread_mutex_init(&mutex, NULL);
no_more_to_push = false;
threads_running = 0;
joined = true;
pthread_cond_init(&task_queue_cond, 0);
pthread_cond_init(&thread_finished_cond, 0);
num_of_threads = threads_count;
joined = false;
no_more_to_push = false;
launch_threads();
}
void ThreadPool_pthread::push_task(ThreadPool_pthread::FunctionType func, void *params)
{
LockGuard_pthread guard(mutex);
this->tasks_queue.push(std::make_pair(func,params));
pthread_cond_signal(&task_queue_cond);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment