From 9dc12f8e5015596e5b130efb978d59236c16bcc0 Mon Sep 17 00:00:00 2001 From: TheQuantumPhysicist Date: Sat, 25 May 2019 17:56:33 +0200 Subject: [PATCH] Commit object pool --- CMakeLists.txt | 6 ++ ObjectPool.h | 170 +++++++++++++++++++++++++++++++++++++++++++++++++ main.cpp | 83 ++++++++++++++++++++++++ 3 files changed, 259 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 ObjectPool.h create mode 100644 main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..e462bd0 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,6 @@ +cmake_minimum_required(VERSION 2.8) + +project(ObjectPool) +add_executable(${PROJECT_NAME} "main.cpp") + +target_link_libraries(${PROJECT_NAME} -lpthread) diff --git a/ObjectPool.h b/ObjectPool.h new file mode 100644 index 0000000..223a7f5 --- /dev/null +++ b/ObjectPool.h @@ -0,0 +1,170 @@ +#ifndef OBJECTPOOL_H +#define OBJECTPOOL_H + +#include +#include +#include +#include +#include + +using AvailableObjectsQueueType = + boost::lockfree::queue>; + +template +class ObjectPool; + +template +class BorrowedObject +{ + T* obj; + ObjectPool* poolPtr; + const std::size_t objectIndex; + bool returned = false; + + BorrowedObject() = delete; + explicit BorrowedObject(T* Borrowed, ObjectPool* PoolPtr, std::size_t ObjIndex) + : obj(Borrowed), poolPtr(PoolPtr), objectIndex(ObjIndex) + { + } + +public: + friend ObjectPool; + void returnObj() + { + if (!returned && obj != nullptr) { + poolPtr->availableObjectsQueue.push(objectIndex); + poolPtr->availableObjectsCount++; + returned = true; + } + } + T* getObj() { return obj; } + ~BorrowedObject() { returnObj(); } +}; + +template +class ObjectPool +{ + AvailableObjectsQueueType availableObjectsQueue; + std::size_t objectCount; + std::vector objects; + std::atomic_uint64_t availableObjectsCount; + std::atomic_bool shutdownFlag; + +public: + friend BorrowedObject; + ObjectPool(std::size_t Size, std::function objectInitializers); + ObjectPool() = delete; + ObjectPool(const ObjectPool&) = delete; + ObjectPool(ObjectPool&&) = delete; + ObjectPool& operator=(const ObjectPool&) = delete; + ObjectPool& operator=(ObjectPool&&) = delete; + + BorrowedObject borrowObj(uint64_t sleepTime_ms = 0); + BorrowedObject try_borrowObj(bool& success); + std::size_t size() const; + uint64_t getAvailableObjectsCount() const; + std::vector& getInternalObjects_unsafe(); + void shutdown(); + ~ObjectPool(); +}; + +template +uint64_t ObjectPool::getAvailableObjectsCount() const +{ + return availableObjectsCount.load(); +} + +template +ObjectPool::ObjectPool(std::size_t Size, std::function objectInitializers) + : availableObjectsQueue(Size) +{ + shutdownFlag.store(false); + for (std::size_t i = 0; i < Size; i++) { + availableObjectsQueue.push(i); + } + objects.reserve(Size); + for (std::size_t i = 0; i < Size; i++) { + objects.push_back(objectInitializers(i)); + } + objectCount = Size; + availableObjectsCount.store(Size); +} + +/// Obj can be null only when shutting down +template +BorrowedObject ObjectPool::borrowObj(uint64_t sleepTime_ms) +{ + std::size_t currIndex = -1; + bool isShuttingDown = false; + while (!(isShuttingDown = shutdownFlag.load(std::memory_order_relaxed)) && + !availableObjectsQueue.pop(currIndex)) { + if (sleepTime_ms) { + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime_ms)); + } + } + if (!isShuttingDown) { + availableObjectsCount--; + BorrowedObject res(&objects[currIndex], this, currIndex); + assert(availableObjectsCount.load() >= 0); + return res; + } else { + BorrowedObject res(nullptr, this, currIndex); + assert(availableObjectsCount.load() >= 0); + return res; + } +} + +/// Obj can be null when shutting down or if the object is not available +template +BorrowedObject ObjectPool::try_borrowObj(bool& success) +{ + std::size_t currIndex = -1; + bool isShuttingDown = shutdownFlag.load(std::memory_order_relaxed); + if (!isShuttingDown && availableObjectsQueue.pop(currIndex)) { + success = true; + } else { + success = false; + } + if (!isShuttingDown && success) { + availableObjectsCount--; + BorrowedObject res(&objects[currIndex], this, currIndex); + assert(availableObjectsCount.load() >= 0); + return res; + } else { + BorrowedObject res(nullptr, this, currIndex); + assert(availableObjectsCount.load() >= 0); + return res; + } +} + +template +std::size_t ObjectPool::size() const +{ + return objectCount; +} + +template +std::vector& ObjectPool::getInternalObjects_unsafe() +{ + return objects; +} + +template +void ObjectPool::shutdown() +{ + // this can be called more than once + shutdownFlag.store(true); + while (availableObjectsCount != objectCount) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + availableObjectsQueue.consume_all([](std::size_t&) {}); + objects.clear(); +} + +template +ObjectPool::~ObjectPool() +{ + shutdown(); +} + +#endif // OBJECTPOOL_H diff --git a/main.cpp b/main.cpp new file mode 100644 index 0000000..70c12b3 --- /dev/null +++ b/main.cpp @@ -0,0 +1,83 @@ +#include "ObjectPool.h" +#include + +// this is a simple wrapper for gtest, you can move these tests to your gtests +#define TEST(a,b) +#define EXPECT_EQ(a__,b__) assert(a__ == b__); +#define EXPECT_NE(a__,b__) assert(a__ != b__); +#define EXPECT_TRUE(a__) assert(a__); +#define EXPECT_TRUE(a__) assert(a__); +#define EXPECT_FALSE(a__) assert(!a__); + +int main() { + TEST(Util, ObjectPool_stress_test) + { + std::size_t poolSize = 100; + std::size_t num_threads = 1000; + + std::vector> threads(num_threads); + + const int factor = 10; + ObjectPool p(poolSize, [](std::size_t z) { return factor * z; }); + for (std::unique_ptr& t : threads) { + t.reset(new std::thread([&p]() { + auto o = p.borrowObj(); + *o.getObj() = *o.getObj() + 1; + })); + } + + for (std::unique_ptr& t : threads) { + t->join(); + } + + int expectedSum = num_threads; // every thread adds 1 to the sum + int sum = 0; + for (std::size_t i = 0; i < poolSize; i++) { + expectedSum += factor * i; // initial value in pool member initializer + sum += *p.borrowObj().getObj(); + } + + EXPECT_EQ(sum, expectedSum); + } + + TEST(Util, ObjectPool_try_borrow) + { + std::size_t poolSize = 4; + + const int factor = 10; + ObjectPool p(poolSize, [](std::size_t z) { return factor * z; }); + bool success = false; + + EXPECT_EQ(p.getAvailableObjectsCount(), 4); + + BorrowedObject o1 = p.try_borrowObj(success); + EXPECT_TRUE(success); + EXPECT_EQ(p.getAvailableObjectsCount(), 3); + EXPECT_NE(o1.getObj(), nullptr); + + BorrowedObject o2 = p.try_borrowObj(success); + EXPECT_EQ(p.getAvailableObjectsCount(), 2); + EXPECT_TRUE(success); + EXPECT_NE(o2.getObj(), nullptr); + + BorrowedObject o3 = p.try_borrowObj(success); + EXPECT_EQ(p.getAvailableObjectsCount(), 1); + EXPECT_TRUE(success); + EXPECT_NE(o3.getObj(), nullptr); + + BorrowedObject o4 = p.try_borrowObj(success); + EXPECT_EQ(p.getAvailableObjectsCount(), 0); + EXPECT_TRUE(success); + EXPECT_NE(o4.getObj(), nullptr); + + BorrowedObject o5 = p.try_borrowObj(success); + EXPECT_FALSE(success); + EXPECT_EQ(o5.getObj(), nullptr); + + o5.returnObj(); + EXPECT_EQ(p.getAvailableObjectsCount(), 0); + + o4.returnObj(); + EXPECT_EQ(p.getAvailableObjectsCount(), 1); + } +}