Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion be/src/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ static bool has_file_cache_statistics(const io::FileCacheStatistics& stats) {
stats.inverted_index_bytes_read_from_remote != 0 ||
stats.inverted_index_bytes_read_from_peer != 0 ||
stats.inverted_index_local_io_timer != 0 || stats.inverted_index_remote_io_timer != 0 ||
stats.inverted_index_peer_io_timer != 0 || stats.inverted_index_io_timer != 0;
stats.inverted_index_peer_io_timer != 0 || stats.inverted_index_io_timer != 0 ||
stats.inverted_index_request_bytes != 0 || stats.inverted_index_read_bytes != 0 ||
stats.inverted_index_range_read_count != 0 ||
stats.inverted_index_serial_read_rounds != 0;
}

Status OlapScanner::_prepare_impl() {
Expand Down
17 changes: 17 additions & 0 deletions be/src/io/cache/block_file_cache_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ FileCacheStatistics diff_file_cache_statistics(const FileCacheStatistics& curren
SUBTRACT_FIELD(inverted_index_remote_io_timer);
SUBTRACT_FIELD(inverted_index_peer_io_timer);
SUBTRACT_FIELD(inverted_index_io_timer);
SUBTRACT_FIELD(inverted_index_request_bytes);
SUBTRACT_FIELD(inverted_index_read_bytes);
SUBTRACT_FIELD(inverted_index_range_read_count);
SUBTRACT_FIELD(inverted_index_serial_read_rounds);
#undef SUBTRACT_FIELD
return diff;
}
Expand Down Expand Up @@ -156,6 +160,14 @@ FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) {
ADD_CHILD_TIMER_WITH_LEVEL(profile, "InvertedIndexPeerIOUseTimer", cache_profile, 1);
inverted_index_io_timer =
ADD_CHILD_TIMER_WITH_LEVEL(profile, "InvertedIndexIOTimer", cache_profile, 1);
inverted_index_request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexRequestBytes", TUnit::BYTES, cache_profile, 1);
inverted_index_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "InvertedIndexReadBytes",
TUnit::BYTES, cache_profile, 1);
inverted_index_range_read_count = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexRangeReadCount", TUnit::UNIT, cache_profile, 1);
inverted_index_serial_read_rounds = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexSerialReadRounds", TUnit::UNIT, cache_profile, 1);
}

void FileCacheProfileReporter::update(const FileCacheStatistics* statistics) const {
Expand Down Expand Up @@ -193,6 +205,11 @@ void FileCacheProfileReporter::update(const FileCacheStatistics* statistics) con
COUNTER_UPDATE(inverted_index_remote_io_timer, statistics->inverted_index_remote_io_timer);
COUNTER_UPDATE(inverted_index_peer_io_timer, statistics->inverted_index_peer_io_timer);
COUNTER_UPDATE(inverted_index_io_timer, statistics->inverted_index_io_timer);
COUNTER_UPDATE(inverted_index_request_bytes, statistics->inverted_index_request_bytes);
COUNTER_UPDATE(inverted_index_read_bytes, statistics->inverted_index_read_bytes);
COUNTER_UPDATE(inverted_index_range_read_count, statistics->inverted_index_range_read_count);
COUNTER_UPDATE(inverted_index_serial_read_rounds,
statistics->inverted_index_serial_read_rounds);
}

} // namespace doris::io
5 changes: 4 additions & 1 deletion be/src/io/cache/block_file_cache_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class FileCacheMetrics {
void register_entity();
void update_metrics_callback();

private:
std::mutex _mtx;
// use shared_ptr for concurrent
std::shared_ptr<AtomicStatistics> _statistics;
Expand Down Expand Up @@ -97,6 +96,10 @@ struct FileCacheProfileReporter {
RuntimeProfile::Counter* inverted_index_remote_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_peer_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_request_bytes = nullptr;
RuntimeProfile::Counter* inverted_index_read_bytes = nullptr;
RuntimeProfile::Counter* inverted_index_range_read_count = nullptr;
RuntimeProfile::Counter* inverted_index_serial_read_rounds = nullptr;

FileCacheProfileReporter(RuntimeProfile* profile);
void update(const FileCacheStatistics* statistics) const;
Expand Down
4 changes: 4 additions & 0 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ struct FileCacheStatistics {
int64_t inverted_index_remote_io_timer = 0;
int64_t inverted_index_peer_io_timer = 0;
int64_t inverted_index_io_timer = 0;
int64_t inverted_index_request_bytes = 0;
int64_t inverted_index_read_bytes = 0;
int64_t inverted_index_range_read_count = 0;
int64_t inverted_index_serial_read_rounds = 0;
};

struct IOContext {
Expand Down
39 changes: 39 additions & 0 deletions be/src/snii/common/slice.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#pragma once

#include <cassert>
#include <cstddef>
#include <cstdint>
#include <string_view>
#include <vector>

namespace snii {

// Read-only byte view (does not own memory). Lifetime is managed by the underlying buffer.
class Slice {
public:
Slice() = default;
Slice(const uint8_t* d, size_t n) : data_(d), size_(n) {}
explicit Slice(const std::vector<uint8_t>& v) : data_(v.data()), size_(v.size()) {}
explicit Slice(std::string_view sv)
: data_(reinterpret_cast<const uint8_t*>(sv.data())), size_(sv.size()) {}

const uint8_t* data() const { return data_; }
size_t size() const { return size_; }
bool empty() const { return size_ == 0; }

uint8_t operator[](size_t i) const {
assert(i < size_);
return data_[i];
}

Slice subslice(size_t off, size_t n) const {
assert(off + n <= size_);
return Slice(data_ + off, n);
}

private:
const uint8_t* data_ = nullptr;
size_t size_ = 0;
};

} // namespace snii
57 changes: 57 additions & 0 deletions be/src/snii/common/status.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#pragma once

#include <string>
#include <utility>

namespace snii {

enum class StatusCode {
kOk,
kCorruption,
kNotFound,
kInvalidArgument,
kIoError,
kUnsupported,
kInternal,
};

// Lightweight error type: success is kOk with no message; failure carries a code + human-readable message.
// Always return Status across API boundaries; silent failures are not allowed.
class Status {
public:
Status() = default;

static Status OK() { return Status(); }
static Status Corruption(std::string m) {
return Status(StatusCode::kCorruption, std::move(m));
}
static Status NotFound(std::string m) { return Status(StatusCode::kNotFound, std::move(m)); }
static Status InvalidArgument(std::string m) {
return Status(StatusCode::kInvalidArgument, std::move(m));
}
static Status IoError(std::string m) { return Status(StatusCode::kIoError, std::move(m)); }
static Status Unsupported(std::string m) {
return Status(StatusCode::kUnsupported, std::move(m));
}
static Status Internal(std::string m) { return Status(StatusCode::kInternal, std::move(m)); }

bool ok() const { return code_ == StatusCode::kOk; }
StatusCode code() const { return code_; }
const std::string& message() const { return message_; }
std::string to_string() const;

private:
Status(StatusCode c, std::string m) : code_(c), message_(std::move(m)) {}

StatusCode code_ = StatusCode::kOk;
std::string message_;
};

} // namespace snii

// Short-circuit return for expressions returning Status (propagate errors upward).
#define SNII_RETURN_IF_ERROR(expr) \
do { \
::snii::Status _s = (expr); \
if (!_s.ok()) return _s; \
} while (0)
44 changes: 44 additions & 0 deletions be/src/snii/encoding/byte_sink.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#pragma once

#include <cstdint>
#include <vector>

#include "snii/common/slice.h"

namespace snii {

// append-only write cursor: all section serialization goes through this; manual byte assembly is forbidden.
// All multi-byte fixed-width fields are little-endian.
class ByteSink {
public:
void put_u8(uint8_t v) { buf_.push_back(v); }
void put_fixed16(uint16_t v);
void put_fixed32(uint32_t v);
void put_fixed64(uint64_t v);
void put_varint32(uint32_t v);
void put_varint64(uint64_t v);
void put_zigzag(int64_t v);
void put_bytes(Slice s);

size_t size() const { return buf_.size(); }
const std::vector<uint8_t>& buffer() const { return buf_; }
Slice view() const { return Slice(buf_); }

// Resets the cursor to empty while RETAINING the backing capacity, so a sink can
// be reused across many small encodes (e.g. per-window region/prx scratch in the
// windowed posting builder) without re-allocating each time -- this avoids the
// cumulative small-allocation churn that fragments the heap arena and inflates
// peak RSS during the merge of a high-df term split into thousands of windows.
void clear() { buf_.clear(); }

// Moves the backing buffer OUT to the caller (the sink is left empty), so an encoded
// section can be handed off without the copy (+ copy-induced capacity slack) that
// reading buffer() and copy-assigning would incur. Use only when the sink is not
// reused afterward (a stack-local about to die, or one that is clear()'d next).
std::vector<uint8_t> take() { return std::move(buf_); }

private:
std::vector<uint8_t> buf_;
};

} // namespace snii
37 changes: 37 additions & 0 deletions be/src/snii/encoding/byte_source.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include <cstddef>
#include <cstdint>

#include "snii/common/slice.h"
#include "snii/common/status.h"

namespace snii {

// Slice read cursor: all section deserialization goes through this; any overrun returns Corruption.
class ByteSource {
public:
explicit ByteSource(Slice s) : s_(s) {}

Status get_u8(uint8_t* v);
Status get_fixed16(uint16_t* v);
Status get_fixed32(uint32_t* v);
Status get_fixed64(uint64_t* v);
Status get_varint32(uint32_t* v);
Status get_varint64(uint64_t* v);
Status get_zigzag(int64_t* v);
Status get_bytes(size_t n, Slice* out);

size_t remaining() const { return s_.size() - pos_; }
size_t position() const { return pos_; }
bool eof() const { return pos_ == s_.size(); }

// Returns a sub-view starting at absolute offset start with length len (used by framer etc. to rewind over the CRC coverage region).
Slice slice_from(size_t start, size_t len) const { return s_.subslice(start, len); }

private:
Slice s_;
size_t pos_ = 0;
};

} // namespace snii
16 changes: 16 additions & 0 deletions be/src/snii/encoding/crc32c.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <cstdint>

#include "snii/common/slice.h"

namespace snii {

// CRC32C (Castagnoli, polynomial 0x1EDC6F41). Used to checksum the tail of each format block.
uint32_t crc32c_extend(uint32_t crc, Slice data);

inline uint32_t crc32c(Slice data) {
return crc32c_extend(0, data);
}

} // namespace snii
22 changes: 22 additions & 0 deletions be/src/snii/encoding/pfor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once

#include <cstddef>
#include <cstdint>

#include "snii/common/status.h"
#include "snii/encoding/byte_sink.h"
#include "snii/encoding/byte_source.h"

namespace snii {

// PFOR integer block encoder/decoder (unsigned uint32 array).
// Encoded layout: [u8 bit_width][varint n_exceptions][bit-packed low
// bits][exception table]. Selects the bit_width that minimizes total byte size;
// values exceeding it go into the exception table (index_delta, full_value).
// delta/zigzag is handled by the upper layer (.frq window); PFOR only processes
// unsigned integer arrays.
void pfor_encode(const uint32_t* values, size_t n, ByteSink* out);
Status pfor_decode(ByteSource* src, size_t n, uint32_t* out);
Status pfor_skip(ByteSource* src, size_t n);

} // namespace snii
27 changes: 27 additions & 0 deletions be/src/snii/encoding/section_framer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include <cstdint>

#include "snii/common/slice.h"
#include "snii/common/status.h"
#include "snii/encoding/byte_sink.h"
#include "snii/encoding/byte_source.h"

namespace snii {

// A framed section: type + payload view.
struct FramedSection {
uint8_t type = 0;
Slice payload;
};

// Unified section framing: [u8 type][varint64 len][payload][fixed32 crc32c(type+len+payload)].
// All full-format sections reuse this encode/checksum path to avoid ad-hoc hand-assembly.
// Unknown optional sections are dispatched by the caller based on type; read still verifies the CRC and skips the payload.
class SectionFramer {
public:
static void write(ByteSink& sink, uint8_t section_type, Slice payload);
static Status read(ByteSource& src, FramedSection* out);
};

} // namespace snii
26 changes: 26 additions & 0 deletions be/src/snii/encoding/varint.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <cstddef>
#include <cstdint>

#include "snii/common/status.h"

namespace snii {

// LEB128 variable-length integer encoding + zigzag. out buffer must be >=10 bytes; returns number of bytes written.
size_t varint_len(uint64_t v);
size_t encode_varint32(uint32_t v, uint8_t* out);
size_t encode_varint64(uint64_t v, uint8_t* out);

// Decode a varint from the range [p, end); on success *next points to the next byte after the consumed input.
Status decode_varint32(const uint8_t* p, const uint8_t* end, uint32_t* v, const uint8_t** next);
Status decode_varint64(const uint8_t* p, const uint8_t* end, uint64_t* v, const uint8_t** next);

inline uint64_t zigzag_encode(int64_t v) {
return (static_cast<uint64_t>(v) << 1) ^ static_cast<uint64_t>(v >> 63);
}
inline int64_t zigzag_decode(uint64_t v) {
return static_cast<int64_t>(v >> 1) ^ -static_cast<int64_t>(v & 1);
}

} // namespace snii
16 changes: 16 additions & 0 deletions be/src/snii/encoding/zstd_codec.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <cstddef>
#include <cstdint>
#include <vector>

#include "snii/common/slice.h"
#include "snii/common/status.h"

namespace snii {

// Thin ZSTD wrapper. Used for compressing large payloads such as .prx windows. Decompression requires the caller to supply the original uncompressed length (from the block header).
Status zstd_compress(Slice input, int level, std::vector<uint8_t>* out);
Status zstd_decompress(Slice input, size_t expected_uncomp_len, std::vector<uint8_t>* out);

} // namespace snii
Loading
Loading