Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.15)

project(Lbug VERSION 0.16.1 LANGUAGES CXX C)
project(Lbug VERSION 0.17.0 LANGUAGES CXX C)

option(SINGLE_THREADED "Single-threaded mode" FALSE)
if(SINGLE_THREADED)
Expand Down
47 changes: 45 additions & 2 deletions src/catalog/catalog_entry/node_table_catalog_entry.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,51 @@
#include "catalog/catalog_entry/node_table_catalog_entry.h"

#include "binder/ddl/bound_create_table_info.h"
#include "common/constants.h"
#include "common/serializer/buffered_file.h"
#include "common/serializer/deserializer.h"
#include "common/string_utils.h"
#include "storage/storage_version_info.h"
#include <format>

using namespace lbug::binder;

namespace lbug {
namespace catalog {

static void upgradeLegacyStorageFormat(const std::string& storage, std::string& storageFormat) {
const auto lowerStorage = common::StringUtils::getLower(storage);
if (lowerStorage.ends_with("parquet")) {
storageFormat = std::string(common::TableOptionConstants::ICEBUG_DISK_FORMAT);
}
}

static bool tryDeserializeStorageFormat(common::Deserializer& deserializer,
std::string& storageFormat) {
auto* reader = dynamic_cast<common::BufferedFileReader*>(deserializer.getReader());
if (reader == nullptr) {
deserializer.deserializeValue(storageFormat);
return true;
}
const auto readOffset = reader->getReadOffset();
uint64_t valueLength = 0;
deserializer.deserializeValue(valueLength);
constexpr uint64_t MAX_STORAGE_FORMAT_LENGTH = 1024;
if (valueLength > MAX_STORAGE_FORMAT_LENGTH) {
reader->resetReadOffset(readOffset);
return false;
}
storageFormat.resize(valueLength);
deserializer.read(reinterpret_cast<uint8_t*>(storageFormat.data()), valueLength);
if (!storageFormat.empty() &&
!common::TableOptionConstants::isIceBugDiskFormat(storageFormat)) {
reader->resetReadOffset(readOffset);
storageFormat.clear();
return false;
}
return true;
}

void NodeTableCatalogEntry::renameProperty(const std::string& propertyName,
const std::string& newName) {
TableCatalogEntry::renameProperty(propertyName, newName);
Expand Down Expand Up @@ -38,8 +74,15 @@ std::unique_ptr<NodeTableCatalogEntry> NodeTableCatalogEntry::deserialize(
deserializer.deserializeValue(primaryKeyName);
deserializer.validateDebuggingInfo(debuggingInfo, "storage");
deserializer.deserializeValue(storage);
deserializer.validateDebuggingInfo(debuggingInfo, "storageFormat");
deserializer.deserializeValue(storageFormat);
if (deserializer.getStorageVersion() >=
::lbug::storage::StorageVersionInfo::STORAGE_VERSION_41) {
deserializer.validateDebuggingInfo(debuggingInfo, "storageFormat");
if (!tryDeserializeStorageFormat(deserializer, storageFormat)) {
upgradeLegacyStorageFormat(storage, storageFormat);
}
} else {
upgradeLegacyStorageFormat(storage, storageFormat);
}
auto nodeTableEntry = std::make_unique<NodeTableCatalogEntry>();
nodeTableEntry->primaryKeyName = primaryKeyName;
nodeTableEntry->storage = storage;
Expand Down
47 changes: 45 additions & 2 deletions src/catalog/catalog_entry/rel_group_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

#include "binder/ddl/bound_create_table_info.h"
#include "catalog/catalog.h"
#include "common/constants.h"
#include "common/serializer/buffered_file.h"
#include "common/serializer/deserializer.h"
#include "common/string_utils.h"
#include "storage/storage_version_info.h"
#include "transaction/transaction.h"
#include <format>

Expand All @@ -14,6 +18,38 @@ using namespace lbug::main;
namespace lbug {
namespace catalog {

static void upgradeLegacyStorageFormat(const std::string& storage, std::string& storageFormat) {
const auto lowerStorage = common::StringUtils::getLower(storage);
if (lowerStorage.ends_with("parquet")) {
storageFormat = std::string(common::TableOptionConstants::ICEBUG_DISK_FORMAT);
}
}

static bool tryDeserializeStorageFormat(Deserializer& deserializer, std::string& storageFormat) {
auto* reader = dynamic_cast<common::BufferedFileReader*>(deserializer.getReader());
if (reader == nullptr) {
deserializer.deserializeValue(storageFormat);
return true;
}
const auto readOffset = reader->getReadOffset();
uint64_t valueLength = 0;
deserializer.deserializeValue(valueLength);
constexpr uint64_t MAX_STORAGE_FORMAT_LENGTH = 1024;
if (valueLength > MAX_STORAGE_FORMAT_LENGTH) {
reader->resetReadOffset(readOffset);
return false;
}
storageFormat.resize(valueLength);
deserializer.read(reinterpret_cast<uint8_t*>(storageFormat.data()), valueLength);
if (!storageFormat.empty() &&
!common::TableOptionConstants::isIceBugDiskFormat(storageFormat)) {
reader->resetReadOffset(readOffset);
storageFormat.clear();
return false;
}
return true;
}

void RelGroupCatalogEntry::addFromToConnection(table_id_t srcTableID, table_id_t dstTableID,
oid_t oid) {
relTableInfos.emplace_back(NodeTableIDPair{srcTableID, dstTableID}, oid);
Expand Down Expand Up @@ -135,8 +171,15 @@ std::unique_ptr<RelGroupCatalogEntry> RelGroupCatalogEntry::deserialize(
}
deserializer.validateDebuggingInfo(debuggingInfo, "relTableInfos");
deserializer.deserializeVector(relTableInfos);
deserializer.validateDebuggingInfo(debuggingInfo, "storageFormat");
deserializer.deserializeValue(storageFormat);
if (deserializer.getStorageVersion() >=
::lbug::storage::StorageVersionInfo::STORAGE_VERSION_41) {
deserializer.validateDebuggingInfo(debuggingInfo, "storageFormat");
if (!tryDeserializeStorageFormat(deserializer, storageFormat)) {
upgradeLegacyStorageFormat(storage, storageFormat);
}
} else {
upgradeLegacyStorageFormat(storage, storageFormat);
}
auto relGroupEntry = std::make_unique<RelGroupCatalogEntry>();
relGroupEntry->srcMultiplicity = srcMultiplicity;
relGroupEntry->dstMultiplicity = dstMultiplicity;
Expand Down
17 changes: 9 additions & 8 deletions src/function/function_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,15 @@ FunctionCollection* FunctionCollection::getFunctions() {

// Table functions
TABLE_FUNCTION(CurrentSettingFunction), TABLE_FUNCTION(CatalogVersionFunction),
TABLE_FUNCTION(DBVersionFunction), TABLE_FUNCTION(ShowTablesFunction),
TABLE_FUNCTION(ShowGraphsFunction), TABLE_FUNCTION(FreeSpaceInfoFunction),
TABLE_FUNCTION(ShowWarningsFunction), TABLE_FUNCTION(TableInfoFunction),
TABLE_FUNCTION(ShowConnectionFunction), TABLE_FUNCTION(StatsInfoFunction),
TABLE_FUNCTION(StorageInfoFunction), TABLE_FUNCTION(ShowAttachedDatabasesFunction),
TABLE_FUNCTION(ShowSequencesFunction), TABLE_FUNCTION(ShowFunctionsFunction),
TABLE_FUNCTION(BMInfoFunction), TABLE_FUNCTION(FileInfoFunction),
TABLE_FUNCTION(DiskSizeInfoFunction), TABLE_FUNCTION(ShowLoadedExtensionsFunction),
TABLE_FUNCTION(DBVersionFunction), TABLE_FUNCTION(StorageVersionFunction),
TABLE_FUNCTION(ShowTablesFunction), TABLE_FUNCTION(ShowGraphsFunction),
TABLE_FUNCTION(FreeSpaceInfoFunction), TABLE_FUNCTION(ShowWarningsFunction),
TABLE_FUNCTION(TableInfoFunction), TABLE_FUNCTION(ShowConnectionFunction),
TABLE_FUNCTION(StatsInfoFunction), TABLE_FUNCTION(StorageInfoFunction),
TABLE_FUNCTION(ShowAttachedDatabasesFunction), TABLE_FUNCTION(ShowSequencesFunction),
TABLE_FUNCTION(ShowFunctionsFunction), TABLE_FUNCTION(BMInfoFunction),
TABLE_FUNCTION(FileInfoFunction), TABLE_FUNCTION(DiskSizeInfoFunction),
TABLE_FUNCTION(ShowLoadedExtensionsFunction),
TABLE_FUNCTION(ShowOfficialExtensionsFunction), TABLE_FUNCTION(ShowIndexesFunction),
TABLE_FUNCTION(ShowProjectedGraphsFunction), TABLE_FUNCTION(ProjectedGraphInfoFunction),
TABLE_FUNCTION(ShowMacrosFunction),
Expand Down
1 change: 1 addition & 0 deletions src/function/table/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ add_library(lbug_table_function
show_tables.cpp
show_warnings.cpp
stats_info.cpp
storage_version.cpp
storage_info.cpp
simple_table_function.cpp
table_function.cpp
Expand Down
46 changes: 46 additions & 0 deletions src/function/table/storage_version.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#include "binder/binder.h"
#include "function/table/bind_data.h"
#include "function/table/simple_table_function.h"
#include "main/client_context.h"
#include "processor/execution_context.h"
#include "storage/database_header.h"
#include "storage/storage_manager.h"

namespace lbug {
namespace function {

static common::offset_t internalTableFunc(const TableFuncMorsel& /*morsel*/,
const TableFuncInput& input, common::DataChunk& output) {
auto& outputVector = output.getValueVectorMutable(0);
auto pos = outputVector.state->getSelVector()[0];
auto* storageManager = storage::StorageManager::Get(*input.context->clientContext);
const auto* header = storageManager->getOrInitDatabaseHeader(*input.context->clientContext);
outputVector.setValue(pos, header->storageVersion);
return 1;
}

static std::unique_ptr<TableFuncBindData> bindFunc(const main::ClientContext*,
const TableFuncBindInput* input) {
std::vector<std::string> returnColumnNames;
std::vector<common::LogicalType> returnTypes;
returnColumnNames.emplace_back("version");
returnTypes.emplace_back(common::LogicalType::UINT64());
returnColumnNames =
TableFunction::extractYieldVariables(returnColumnNames, input->yieldVariables);
auto columns = input->binder->createVariables(returnColumnNames, returnTypes);
return std::make_unique<TableFuncBindData>(std::move(columns), 1 /* one row result */);
}

function_set StorageVersionFunction::getFunctionSet() {
function_set functionSet;
auto function = std::make_unique<TableFunction>(name, std::vector<common::LogicalTypeID>{});
function->tableFunc = SimpleTableFunc::getTableFunc(internalTableFunc);
function->bindFunc = bindFunc;
function->initSharedStateFunc = SimpleTableFunc::initSharedState;
function->initLocalStateFunc = TableFunction::initEmptyLocalState;
functionSet.push_back(std::move(function));
return functionSet;
}

} // namespace function
} // namespace lbug
5 changes: 5 additions & 0 deletions src/include/common/serializer/deserializer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <functional>
#include <limits>
#include <map>
#include <memory>
#include <string>
Expand All @@ -20,6 +21,9 @@ class LBUG_API Deserializer {

bool finished() const { return reader->finished(); }

void setStorageVersion(uint64_t version) { storageVersion = version; }
uint64_t getStorageVersion() const { return storageVersion; }

template<typename T>
requires std::is_trivially_destructible_v<T> || std::is_same_v<std::string, T>
void deserializeValue(T& value) {
Expand Down Expand Up @@ -139,6 +143,7 @@ class LBUG_API Deserializer {

private:
std::unique_ptr<Reader> reader;
uint64_t storageVersion = std::numeric_limits<uint64_t>::max();
};

template<>
Expand Down
6 changes: 6 additions & 0 deletions src/include/function/table/simple_table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ struct DBVersionFunction final {
static function_set getFunctionSet();
};

struct StorageVersionFunction final {
static constexpr const char* name = "STORAGE_VERSION";

static function_set getFunctionSet();
};

struct ShowTablesFunction final {
static constexpr const char* name = "SHOW_TABLES";

Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/checkpointer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class Checkpointer {
DatabaseHeader checkpointHeader{};
// Whether storage had changes during checkpointStoragePhase.
bool hasStorageChanges = false;
// Whether this checkpoint upgrades the durable catalog/header storage layout.
bool hasStorageVersionUpgrade = false;
// Versions captured at the end of writeCheckpoint() while the write gate is still held.
uint64_t catalogVersionAtCheckpoint = 0;
uint64_t pageManagerVersionAtCheckpoint = 0;
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/database_header.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "common/types/uuid.h"
#include "storage/page_range.h"
#include "storage/storage_version_info.h"

namespace lbug {
namespace storage {
Expand All @@ -19,6 +20,8 @@ struct DatabaseHeader {
// Used to ensure that files such as the WAL match the current database
common::uuid databaseID{0};

storage_version_t storageVersion{StorageVersionInfo::getStorageVersion()};

void updateCatalogPageRange(PageManager& pageManager, PageRange newPageRange);
void freeMetadataPageRange(PageManager& pageManager) const;
void serialize(common::Serializer& ser) const;
Expand Down
7 changes: 6 additions & 1 deletion src/include/storage/storage_version_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ struct StorageVersionInfo {
// Storage version 40 spans the releases after 0.11.0 where the on-disk catalog/data format did
// not change.
static constexpr storage_version_t STORAGE_VERSION_40 = 40;
// Storage version 41 adds the table storage FORMAT field to catalog entries.
static constexpr storage_version_t STORAGE_VERSION_41 = 41;

static std::unordered_map<std::string, storage_version_t> getStorageVersionInfo() {
return {{"0.12.0", STORAGE_VERSION_40}, {"0.12.2", STORAGE_VERSION_40},
Expand All @@ -23,10 +25,13 @@ struct StorageVersionInfo {
{"0.15.0", STORAGE_VERSION_40}, {"0.15.1", STORAGE_VERSION_40},
{"0.15.2", STORAGE_VERSION_40}, {"0.15.3", STORAGE_VERSION_40},
{"0.15.4", STORAGE_VERSION_40}, {"0.16.0", STORAGE_VERSION_40},
{"0.16.1", STORAGE_VERSION_40}};
{"0.16.1", STORAGE_VERSION_40}, {"0.17.0", STORAGE_VERSION_41}};
}

static LBUG_API storage_version_t getStorageVersion();
static bool canReadStorageVersion(storage_version_t storageVersion) {
return storageVersion == STORAGE_VERSION_40 || storageVersion == getStorageVersion();
}

static constexpr const char* MAGIC_BYTES = "LBUG";
};
Expand Down
9 changes: 8 additions & 1 deletion src/storage/checkpointer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "storage/database_header.h"
#include "storage/shadow_utils.h"
#include "storage/storage_manager.h"
#include "storage/storage_version_info.h"
#include "storage/wal/local_wal.h"
#include "transaction/transaction.h"

Expand Down Expand Up @@ -103,6 +104,9 @@ void Checkpointer::writeCheckpoint() {
walRotated = mainStorageManager->getWAL().rotateForCheckpoint(&clientContext);

auto databaseHeader = *mainStorageManager->getOrInitDatabaseHeader(clientContext);
const auto oldStorageVersion = databaseHeader.storageVersion;
databaseHeader.storageVersion = StorageVersionInfo::getStorageVersion();
hasStorageVersionUpgrade = oldStorageVersion != databaseHeader.storageVersion;
bool localHasStorageChanges = checkpointStorage();
serializeCatalogAndMetadata(databaseHeader, localHasStorageChanges);
databaseHeader.dataFileNumPages = mainStorageManager->getDataFH()->getNumPages();
Expand All @@ -127,6 +131,9 @@ void Checkpointer::beginCheckpoint(common::transaction_t snapshotTimestamp) {
walRotated = mainStorageManager->getWAL().rotateForCheckpoint(&clientContext);

checkpointHeader = *mainStorageManager->getOrInitDatabaseHeader(clientContext);
const auto oldStorageVersion = checkpointHeader.storageVersion;
checkpointHeader.storageVersion = StorageVersionInfo::getStorageVersion();
hasStorageVersionUpgrade = oldStorageVersion != checkpointHeader.storageVersion;

// Capture versions while the write gate is still held.
catalogVersionAtCheckpoint = clientContext.getDatabase()->getCatalog()->getVersion();
Expand Down Expand Up @@ -203,7 +210,7 @@ void Checkpointer::serializeCatalogAndMetadata(DatabaseHeader& databaseHeader,
const bool useSnapshot = snapshotTS > 0;

if (databaseHeader.catalogPageRange.startPageIdx == common::INVALID_PAGE_IDX ||
catalog->changedSinceLastCheckpoint()) {
catalog->changedSinceLastCheckpoint() || hasStorageVersionUpgrade) {
databaseHeader.updateCatalogPageRange(*dataFH->getPageManager(),
useSnapshot ? serializeCatalogSnapshot(*catalog, *mainStorageManager) :
serializeCatalog(*catalog, *mainStorageManager));
Expand Down
Loading
Loading