Skip to content

Commit 0c96932

Browse files
authored
[Issue pixelsdb#973] implement a buffer pool for pixels cpp (pixelsdb#1101)
1.update pxiels-extension 2. implement bufferpool 3. use hugepage 4. update pixels-cpp.properties
1 parent 5b6561d commit 0c96932

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2616
-413
lines changed

.gitmodules

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,7 @@
77
[submodule "cpp/third-party/googletest"]
88
path = cpp/third-party/googletest
99
url = git@github.com:google/googletest.git
10+
[submodule "cpp/FlameGraph"]
11+
path = cpp/third-party/FlameGraph
12+
url = git@github.com:brendangregg/FlameGraph.git
13+

cpp/.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,8 @@ plot
55
**/cmake-build-debug
66
**/CMakeCache.txt
77
**/CMakeFiles
8+
# remove perf svg
9+
cpp/testcase/perf-*/*.svg/*.svg
10+
*.csv
11+
*.txt
12+
*.svg

cpp/Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,8 @@ debug: deps
5757

5858
release: deps
5959
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Release ${BUILD_FLAGS} -S pixels-duckdb/duckdb -B build/release && \
60-
cmake --build build/release --config Release
60+
cmake --build build/release --config Release
61+
62+
relWithDebInfo: deps
63+
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=RelWithDebInfo ${BUILD_FLAGS} -S pixels-duckdb/duckdb -B build/relWithDebInfo && \
64+
cmake --build build/relWithDebInfo --config RelWithDebInfo

cpp/include/PixelsReadBindData.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ namespace duckdb
4242
std::shared_ptr <PixelsReader> initialPixelsReader;
4343
std::shared_ptr <TypeDescription> fileSchema;
4444
vector <string> files;
45-
atomic <idx_t> curFileId;
45+
atomic <int> curFileId;
4646
};
4747

4848
}

cpp/include/PixelsReadGlobalState.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ namespace duckdb
4040
{
4141
mutex lock;
4242

43+
atomic<int> active_threads; // Number of active threads
44+
atomic<bool> all_done; // Whether all threads have completed
45+
4346
//! The initial reader from the bind phase
4447
std::shared_ptr <PixelsReader> initialPixelsReader;
4548

cpp/include/PixelsScanFunction.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ namespace duckdb
113113
PixelsScanInitLocal(ExecutionContext &context, TableFunctionInitInput &input,
114114
GlobalTableFunctionState *gstate_p);
115115

116-
static bool PixelsParallelStateNext(ClientContext &context, const PixelsReadBindData &bind_data,
116+
static bool PixelsParallelStateNext(ClientContext &context, PixelsReadBindData &bind_data,
117117
PixelsReadLocalState &scan_data, PixelsReadGlobalState &parallel_state,
118118
bool is_init_state = false);
119119

cpp/pixels-common/CMakeLists.txt

Lines changed: 15 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,50 +4,21 @@ set(CMAKE_CXX_STANDARD 17)
44

55
include(ExternalProject)
66

7-
set(pixels_common_cxx
8-
lib/physical/storage/LocalFS.cpp
9-
lib/physical/storage/LocalFSProvider.cpp
10-
lib/physical/storage/PhysicalLocalWriter.cpp
11-
lib/physical/PhysicalWriterOption.cpp
12-
lib/physical/Status.cpp
13-
lib/physical/Storage.cpp
14-
lib/physical/FilePath.cpp
15-
lib/physical/natives/PixelsRandomAccessFile.cpp
16-
lib/physical/natives/DirectRandomAccessFile.cpp
17-
lib/physical/natives/ByteBuffer.cpp
18-
lib/physical/io/PhysicalLocalReader.cpp
19-
lib/physical/StorageFactory.cpp
20-
lib/physical/Request.cpp
21-
lib/physical/RequestBatch.cpp
22-
lib/physical/scheduler/NoopScheduler.cpp
23-
lib/physical/SchedulerFactory.cpp
24-
lib/exception/InvalidArgumentException.cpp
25-
lib/utils/Constants.cpp
26-
lib/utils/String.cpp
27-
include/physical/natives/DirectIoLib.h
28-
lib/physical/natives/DirectIoLib.cpp
29-
include/utils/ConfigFactory.h
30-
lib/utils/ConfigFactory.cpp
31-
include/physical/MergedRequest.h
32-
include/physical/scheduler/SortMergeScheduler.h
33-
lib/physical/scheduler/SortMergeScheduler.cpp
34-
lib/MergedRequest.cpp include/profiler/TimeProfiler.h
35-
lib/profiler/TimeProfiler.cpp
36-
include/profiler/CountProfiler.h
37-
lib/profiler/CountProfiler.cpp
38-
include/profiler/AbstractProfiler.h
39-
include/physical/allocator/Allocator.h
40-
include/physical/allocator/OrdinaryAllocator.h
41-
lib/physical/allocator/OrdinaryAllocator.cpp
42-
include/physical/allocator/BufferPoolAllocator.h
43-
lib/physical/allocator/BufferPoolAllocator.cpp
44-
include/physical/BufferPool.h
45-
lib/physical/BufferPool.cpp
46-
include/physical/natives/DirectUringRandomAccessFile.h
47-
lib/physical/natives/DirectUringRandomAccessFile.cpp
48-
include/utils/ColumnSizeCSVReader.h lib/utils/ColumnSizeCSVReader.cpp
49-
include/physical/StorageArrayScheduler.h lib/physical/StorageArrayScheduler.cpp
50-
include/physical/natives/ByteOrder.h
7+
8+
file(GLOB_RECURSE pixels_common_cxx
9+
"lib/physical/*.cpp"
10+
"lib/physical/*.h"
11+
"lib/exception/*.cpp"
12+
"lib/exception/*.h"
13+
"lib/utils/*.cpp"
14+
"lib/utils/*.h"
15+
"lib/profiler/*.cpp"
16+
"lib/profiler/*.h"
17+
"include/physical/*.h"
18+
"include/profiler/*.h"
19+
"include/utils/*.h"
20+
"include/physical/BufferPool/*.h"
21+
"lib/MergedRequest.cpp"
5122
)
5223

5324
include_directories(include)

cpp/pixels-common/include/physical/BufferPool.h

Lines changed: 149 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,49 +25,179 @@
2525
#ifndef DUCKDB_BUFFERPOOL_H
2626
#define DUCKDB_BUFFERPOOL_H
2727

28-
#include <iostream>
29-
#include <vector>
28+
#include "exception/InvalidArgumentException.h"
29+
#include "physical/BufferPool/Bitmap.h"
30+
#include "physical/BufferPool/BufferPoolEntry.h"
3031
#include "physical/natives/ByteBuffer.h"
31-
#include <memory>
3232
#include "physical/natives/DirectIoLib.h"
33-
#include "exception/InvalidArgumentException.h"
3433
#include "utils/ColumnSizeCSVReader.h"
34+
#include <cstdio>
35+
#include <iostream>
3536
#include <map>
36-
37-
// when allocating buffer pool, we use the size of the first pxl file. Consider that
38-
// the remaining pxl file has larger size than the first file, we allocate some extra
39-
// size (10MB) to each column.
37+
#include <memory>
38+
#include <mutex>
39+
#include <thread>
40+
#include <vector>
41+
// when allocating buffer pool, we use the size of the first pxl file. Consider
42+
// that the remaining pxl file has larger size than the first file, we allocate
43+
// some extra size (10MB) to each column.
4044
// TODO: how to evaluate the maximal pool size
41-
#define EXTRA_POOL_SIZE 3*1024*1024
45+
#define EXTRA_POOL_SIZE 10 * 1024 * 1024
4246

4347
class DirectUringRandomAccessFile;
48+
4449
// This class is global class. The variable is shared by each thread
4550
class BufferPool
4651
{
4752
public:
48-
static void
49-
Initialize(std::vector <uint32_t> colIds, std::vector <uint64_t> bytes, std::vector <std::string> columnNames);
5053

51-
static std::shared_ptr <ByteBuffer> GetBuffer(uint32_t colId);
54+
class BufferPoolManagedEntry
55+
{
56+
public:
57+
enum class State
58+
{
59+
InitizaledNotAllocated,
60+
AllocatedAndInUse,
61+
UselessButNotFree
62+
};
63+
64+
private:
65+
std::shared_ptr<BufferPoolEntry> bufferPoolEntry;
66+
int ringIndex;
67+
size_t currentSize;
68+
int offset;
69+
State state;
70+
71+
public:
72+
BufferPoolManagedEntry(std::shared_ptr<BufferPoolEntry> entry, int ringIdx,
73+
size_t currSize, off_t off)
74+
: bufferPoolEntry(std::move(entry)), ringIndex(ringIdx),
75+
currentSize(currSize), offset(off),
76+
state(State::InitizaledNotAllocated)
77+
{
78+
}
79+
80+
std::shared_ptr<BufferPoolEntry> getBufferPoolEntry() const
81+
{
82+
return bufferPoolEntry;
83+
}
84+
85+
int getRingIndex() const
86+
{
87+
return ringIndex;
88+
}
89+
90+
void setRingIndex(int index)
91+
{
92+
ringIndex = index;
93+
}
94+
95+
size_t getCurrentSize() const
96+
{
97+
return currentSize;
98+
}
99+
100+
void setCurrentSize(size_t size)
101+
{
102+
currentSize = size;
103+
}
104+
105+
int getOffset() const
106+
{
107+
return offset;
108+
}
52109

53-
static int64_t GetBufferId(uint32_t index);
110+
void setOffset(int off)
111+
{
112+
offset = off;
113+
}
114+
115+
State getStatus() const
116+
{
117+
return state;
118+
}
119+
120+
void setStatus(State newStatus)
121+
{
122+
state = newStatus;
123+
}
124+
};
125+
126+
static void Initialize(std::vector<uint32_t> colIds,
127+
std::vector<uint64_t> bytes,
128+
std::vector<std::string> columnNames);
129+
130+
static void InitializeBuffers();
131+
132+
static std::shared_ptr<ByteBuffer> GetBuffer(uint32_t colId, uint64_t byte,
133+
std::string columnName);
134+
135+
static int64_t GetBufferId();
54136

55137
static void Switch();
56138

57139
static void Reset();
58140

141+
static std::shared_ptr<BufferPoolEntry> AddNewBuffer(size_t size);
142+
143+
static int getRingIndex(uint32_t colId);
144+
145+
static std::shared_ptr<ByteBuffer> AllocateNewBuffer(
146+
std::shared_ptr<BufferPoolManagedEntry> currentBufferManagedEntry,
147+
uint32_t colId, uint64_t byte, std::string columnName);
148+
149+
static std::shared_ptr<ByteBuffer> ReusePreviousBuffer(
150+
std::shared_ptr<BufferPoolManagedEntry> currentBufferManagedEntry,
151+
uint32_t colId, uint64_t byte, std::string columnName);
152+
153+
static void PrintStats()
154+
{
155+
// Get the ID of the current thread
156+
std::thread::id tid = std::this_thread::get_id();
157+
158+
// Print global buffer usage: used size / free size
159+
// Convert thread ID to integer for readability using hash
160+
printf("Thread %zu -> Global buffer usage: %ld / %ld\n",
161+
std::hash<std::thread::id>{}(tid), globalUsedSize,
162+
globalFreeSize);
163+
164+
// Print thread-local statistics for Buffer0
165+
printf("Thread %zu -> Buffer0 usage: %zu, Buffer count: %d\n",
166+
std::hash<std::thread::id>{}(tid), threadLocalUsedSize[0],
167+
threadLocalBufferCount[0]);
168+
169+
// Print thread-local statistics for Buffer1
170+
printf("Thread %zu -> Buffer1 usage: %zu, Buffer count: %d\n",
171+
std::hash<std::thread::id>{}(tid), threadLocalUsedSize[1],
172+
threadLocalBufferCount[1]);
173+
}
59174
private:
60175
BufferPool() = default;
176+
// global
177+
static std::mutex bufferPoolMutex;
61178

62-
static thread_local int colCount;
63-
static thread_local std::map<uint32_t, uint64_t>
64-
nrBytes;
179+
// thread local
65180
static thread_local bool isInitialized;
66-
static thread_local std::map<uint32_t, std::shared_ptr < ByteBuffer>>
67-
buffers[2];
68-
static std::shared_ptr <DirectIoLib> directIoLib;
181+
static thread_local std::vector<std::shared_ptr<BufferPoolEntry>>
182+
registeredBuffers[2];
183+
static thread_local long globalUsedSize;
184+
static thread_local long globalFreeSize;
185+
static thread_local std::shared_ptr<DirectIoLib> directIoLib;
186+
static thread_local int nextRingIndex;
187+
static thread_local std::shared_ptr<BufferPoolEntry>
188+
nextEmptyBufferPoolEntry[2];
189+
static thread_local int colCount;
69190
static thread_local int currBufferIdx;
70191
static thread_local int nextBufferIdx;
192+
static thread_local std::map<uint32_t, std::shared_ptr<ByteBuffer>>
193+
buffersAllocated[2];
71194
friend class DirectUringRandomAccessFile;
195+
196+
static thread_local std::unordered_map<
197+
uint32_t, std::shared_ptr<BufferPoolManagedEntry>>
198+
ringBufferMap[2];
199+
200+
static thread_local size_t threadLocalUsedSize[2];
201+
static thread_local int threadLocalBufferCount[2];
72202
};
73203
#endif // DUCKDB_BUFFERPOOL_H

0 commit comments

Comments
 (0)