diff --git a/extension b/extension index 5b9465a7d1..da36651683 160000 --- a/extension +++ b/extension @@ -1 +1 @@ -Subproject commit 5b9465a7d164c8214dcbdb6c9ddde75577ce9aa9 +Subproject commit da36651683b2dc3668fac4ac84487cfd80f44eb6 diff --git a/scripts/pgembed_ladybug_sample_query.py b/scripts/pgembed_ladybug_sample_query.py new file mode 100644 index 0000000000..4f3d0783b2 --- /dev/null +++ b/scripts/pgembed_ladybug_sample_query.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 + +""" + + uv run --python 3.12 --with pgembed --with 'psycopg[binary]' \ + python scripts/pgembed_ladybug_sample_query.py +""" + +from __future__ import annotations + +import importlib.util +import os +import platform +import subprocess +import sys +import tempfile +from pathlib import Path +from urllib.parse import parse_qs, unquote, urlparse + + +def is_musl() -> bool: + libc_name, _ = platform.libc_ver() + if libc_name == "musl": + return True + try: + return "musl" in os.confstr("CS_GNU_LIBC_VERSION").lower() + except (AttributeError, OSError, ValueError): + return False + + +def import_pgembed_or_bootstrap(): + try: + import pgembed + + return pgembed + except ModuleNotFoundError: + if is_musl(): + raise + if os.environ.get("LBUG_PGEMBED_BOOTSTRAPPED") == "1": + raise + env = os.environ.copy() + env["LBUG_PGEMBED_BOOTSTRAPPED"] = "1" + os.execvpe( + "uv", + [ + "uv", + "run", + "--python", + env.get("PGEMBED_PYTHON", "3.12"), + "--with", + "pgembed", + "--with", + "psycopg[binary]", + "python", + __file__, + ], + env, + ) + raise RuntimeError("unreachable") + + +def first_query_value(query: dict[str, list[str]], key: str) -> str | None: + values = query.get(key) + return values[0] if values else None + + +def quote_libpq_value(value: str) -> str: + if value and not any(char.isspace() or char in "\\'" for char in value): + return value + return "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" + + +def uri_to_libpq_connection_string(uri: str, database_name: str, user: str) -> str: + parsed = urlparse(uri) + query = parse_qs(parsed.query) + values = { + "dbname": database_name, + "user": user, + "host": parsed.hostname or first_query_value(query, "host") or "localhost", + "password": "ci", + } + port = parsed.port or first_query_value(query, "port") + if port is not None: + values["port"] = str(port) + if parsed.password: + values["password"] = unquote(parsed.password) + return " ".join(f"{key}={quote_libpq_value(value)}" for key, value in values.items()) + + +def repo_root() -> Path: + return Path(__file__).resolve().parent.parent + + +def import_ladybug_from_submodule(root: Path): + package_dir = root / "tools" / "python_api" / "src_py" + os.environ.setdefault( + "LBUG_C_API_LIB_PATH", + str(root / "build" / "relwithdebinfo" / "src" / "liblbug.dylib"), + ) + spec = importlib.util.spec_from_file_location( + "ladybug", + package_dir / "__init__.py", + submodule_search_locations=[str(package_dir)], + ) + if spec is None or spec.loader is None: + raise RuntimeError(f"failed to load ladybug package from {package_dir}") + module = importlib.util.module_from_spec(spec) + sys.modules["ladybug"] = module + spec.loader.exec_module(module) + return module + + +def run_ladybug_query(conn, query: str): + result = conn.execute(query) + return result.get_all() + + +def main() -> int: + pgembed = import_pgembed_or_bootstrap() + import psycopg + + root = repo_root() + lb = import_ladybug_from_submodule(root) + + extension_path = root / "extension" / "postgres" / "build" / "libpostgres.lbug_extension" + if not extension_path.exists(): + raise RuntimeError(f"missing postgres extension: {extension_path}") + + with tempfile.TemporaryDirectory(prefix="lbug_pgembed_repro_") as tmpdir: + with pgembed.get_server(tmpdir) as pg: + admin_uri = pg.get_uri("postgres") + repro_uri = pg.get_uri("pgscan") + + with psycopg.connect(admin_uri, autocommit=True) as conn: + conn.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = 'ci') THEN + CREATE ROLE ci WITH LOGIN SUPERUSER PASSWORD 'ci'; + END IF; + END + $$; + """ + ) + if ( + conn.execute( + "SELECT 1 FROM pg_database WHERE datname = 'pgscan'" + ).fetchone() + is None + ): + conn.execute("CREATE DATABASE pgscan OWNER ci") + + with psycopg.connect(repro_uri) as conn: + conn.execute( + """ + CREATE TABLE organisation ( + id BIGINT PRIMARY KEY, + name TEXT NOT NULL, + score BIGINT NOT NULL, + mark DOUBLE PRECISION NOT NULL, + orgcode BIGINT NOT NULL + ) + """ + ) + conn.execute( + """ + INSERT INTO organisation VALUES + (1, 'ABFsUni', 4, 3.7, 325), + (4, 'CsWork', 7, 4.1, 934), + (6, 'DEsWork', 2, 4.1, 824) + """ + ) + conn.commit() + + conn_string = uri_to_libpq_connection_string(repro_uri, "pgscan", "ci") + db = lb.Database(":memory:", backend="capi") + conn = lb.Connection(db) + try: + run_ladybug_query(conn, f"LOAD EXTENSION '{extension_path}'") + run_ladybug_query( + conn, + f"ATTACH '{conn_string}' AS pg (dbtype POSTGRES)", + ) + + checks = [ + ( + "select-star-projection", + "CALL SQL_QUERY('pg', 'select * from organisation') " + "RETURN name, orgcode", + [["ABFsUni", 325], ["CsWork", 934], ["DEsWork", 824]], + ), + ( + "select-star-reordered-projection", + "CALL SQL_QUERY('pg', 'select * from organisation') " + "RETURN orgcode, name", + [[325, "ABFsUni"], [934, "CsWork"], [824, "DEsWork"]], + ), + ( + "select-star-filter-skipped-column", + "CALL SQL_QUERY('pg', 'select * from organisation') " + "WHERE score > 4 RETURN name, orgcode", + [["CsWork", 934]], + ), + ( + "explicit-query-filter-skipped-column", + "CALL SQL_QUERY('pg', " + "'select name, score, mark, orgcode from organisation') " + "WHERE score > 4 YIELD name, score, mark, orgcode AS code " + "RETURN name, code", + [["CsWork", 934]], + ), + ] + + for name, query, expected in checks: + actual = run_ladybug_query(conn, query) + print(f"{name}: {actual}") + if actual != expected: + raise AssertionError(f"{name}: expected {expected}, got {actual}") + finally: + conn.close() + db.close() + + print("postgres select-star repro passed") + return 0 + + +if __name__ == "__main__": + if ( + "uv" not in Path(sys.executable).name + and os.environ.get("LBUG_PGEMBED_BOOTSTRAPPED") != "1" + ): + try: + import pgembed # noqa: F401 + import psycopg # noqa: F401 + except ModuleNotFoundError: + cmd = [ + "uv", + "run", + "--python", + os.environ.get("PGEMBED_PYTHON", "3.12"), + "--with", + "pgembed", + "--with", + "psycopg[binary]", + "python", + __file__, + ] + raise SystemExit(subprocess.run(cmd).returncode) + raise SystemExit(main()) diff --git a/src/optimizer/filter_push_down_optimizer.cpp b/src/optimizer/filter_push_down_optimizer.cpp index f79b6ee21a..310717e429 100644 --- a/src/optimizer/filter_push_down_optimizer.cpp +++ b/src/optimizer/filter_push_down_optimizer.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "binder/expression/literal_expression.h" #include "binder/expression/property_expression.h" @@ -235,9 +236,32 @@ std::shared_ptr FilterPushDownOptimizer::visitScanNodeTableRepl std::shared_ptr FilterPushDownOptimizer::visitTableFunctionCallReplace( const std::shared_ptr& op) { auto& tableFunctionCall = op->cast(); - auto columnPredicates = getColumnPredicateSets(tableFunctionCall.getBindData()->columns, - predicateSet.getAllPredicates()); + if (!tableFunctionCall.getTableFunc().supportsPushDownFunc()) { + return finishPushDown(op); + } + std::vector columnPredicates; + std::unordered_set pushedPredicates; + auto predicates = predicateSet.getAllPredicates(); + for (auto& column : tableFunctionCall.getBindData()->columns) { + auto columnPredicateSet = ColumnPredicateSet(); + for (auto& predicate : predicates) { + auto columnPredicate = ColumnPredicateUtil::tryConvert(*column, *predicate); + if (columnPredicate == nullptr) { + continue; + } + columnPredicateSet.addPredicate(std::move(columnPredicate)); + pushedPredicates.insert(predicate.get()); + } + columnPredicates.push_back(std::move(columnPredicateSet)); + } tableFunctionCall.setColumnPredicates(std::move(columnPredicates)); + auto remainingPredicates = PredicateSet(); + for (auto& predicate : predicates) { + if (!pushedPredicates.contains(predicate.get())) { + remainingPredicates.addPredicate(predicate); + } + } + predicateSet = std::move(remainingPredicates); return finishPushDown(op); } diff --git a/src/optimizer/foreign_join_push_down_optimizer.cpp b/src/optimizer/foreign_join_push_down_optimizer.cpp index f2d3a87aa5..bf1c49ad63 100644 --- a/src/optimizer/foreign_join_push_down_optimizer.cpp +++ b/src/optimizer/foreign_join_push_down_optimizer.cpp @@ -10,6 +10,7 @@ #include "common/exception/runtime.h" #include "main/database_manager.h" #include "planner/operator/extend/logical_extend.h" +#include "planner/operator/logical_filter.h" #include "planner/operator/logical_flatten.h" #include "planner/operator/logical_hash_join.h" #include "planner/operator/logical_table_function_call.h" @@ -132,6 +133,7 @@ struct ForeignJoinPatternInfo { // Intermediate operators const LogicalHashJoin* outerHashJoin = nullptr; const LogicalHashJoin* innerHashJoin = nullptr; + const LogicalFilter* relFilter = nullptr; // Original output schema const Schema* outputSchema = nullptr; // Table names extracted from bind data @@ -221,6 +223,13 @@ static std::optional matchPattern(const LogicalOperator* // Inner hash join probe side should be EXTEND auto extendOp = probeOp->getChild(0).get(); + if (extendOp != nullptr && extendOp->getOperatorType() == LogicalOperatorType::FILTER) { + info.relFilter = extendOp->constPtrCast(); + if (extendOp->getNumChildren() < 1) { + return std::nullopt; + } + extendOp = extendOp->getChild(0).get(); + } if (extendOp == nullptr || extendOp->getOperatorType() != LogicalOperatorType::EXTEND) { return std::nullopt; } @@ -516,8 +525,16 @@ std::shared_ptr ForeignJoinPushDownOptimizer::visitHashJoinRepl // "a.id" when a canonical pattern-variable expression "_N_a.id" exists. auto allColumns = info.outputSchema->getExpressionsInScope(); expression_vector outputColumns; + std::unordered_set outputColumnNames; std::unordered_set canonicalVarProps; + auto appendOutputColumn = [&](const std::shared_ptr& column) { + if (!outputColumnNames.insert(column->getUniqueName()).second) { + return; + } + outputColumns.push_back(column); + }; + auto extractCanonicalVarProp = [](const std::string& uniqueName) -> std::string { // "_N_var.prop" -> "var.prop" if (uniqueName.empty() || uniqueName[0] != '_') { @@ -582,14 +599,30 @@ std::shared_ptr ForeignJoinPushDownOptimizer::visitHashJoinRepl if (hasLowercaseID(uniqueName)) { continue; } - outputColumns.push_back(col); + appendOutputColumn(col); } + // The foreign join rewrite runs before projection pushdown, so the matched + // hash join's schema can be narrower than parent FILTER/ORDER BY/PROJECTION + // requirements. Keep the available graph properties in the pushed-down scan; + // projection pushdown can prune unused columns later. + auto appendPatternProperties = [&](const std::shared_ptr& pattern) { + for (auto& property : pattern->getPropertyExpressions()) { + if (property->getPropertyName().starts_with("_")) { + continue; + } + appendOutputColumn(property); + } + }; + appendPatternProperties(info.extend->getBoundNode()); + appendPatternProperties(info.extend->getRel()); + appendPatternProperties(info.extend->getNbrNode()); + // Fallback: if no property/variable columns were identified, preserve // original scope to avoid breaking operator replacement. if (outputColumns.empty()) { for (auto& col : allColumns) { - outputColumns.push_back(col); + appendOutputColumn(col); } } @@ -603,6 +636,10 @@ std::shared_ptr ForeignJoinPushDownOptimizer::visitHashJoinRepl return op; } + if (info.relFilter != nullptr) { + result = std::make_shared(info.relFilter->getPredicate(), std::move(result)); + result->computeFlatSchema(); + } return result; } diff --git a/src/optimizer/projection_push_down_optimizer.cpp b/src/optimizer/projection_push_down_optimizer.cpp index 1152e64199..9f33043141 100644 --- a/src/optimizer/projection_push_down_optimizer.cpp +++ b/src/optimizer/projection_push_down_optimizer.cpp @@ -1,5 +1,7 @@ #include "optimizer/projection_push_down_optimizer.h" +#include + #include "binder/expression_visitor.h" #include "function/gds/gds_function_collection.h" #include "function/gds/rec_joins.h" @@ -167,6 +169,10 @@ void ProjectionPushDownOptimizer::visitOrderBy(LogicalOperator* op) { for (auto& expression : orderBy.getExpressionsToOrderBy()) { collectExpressionsInUse(expression); } + auto child = orderBy.getChild(0); + for (auto& expression : child->getSchema()->getExpressionsInScope()) { + collectExpressionsInUse(expression); + } auto expressionsBeforePruning = orderBy.getChild(0)->getSchema()->getExpressionsInScope(); auto expressionsAfterPruning = pruneExpressions(expressionsBeforePruning); if (expressionsBeforePruning.size() == expressionsAfterPruning.size()) { @@ -260,12 +266,18 @@ void ProjectionPushDownOptimizer::visitCopyFrom(LogicalOperator* op) { void ProjectionPushDownOptimizer::visitTableFunctionCall(LogicalOperator* op) { auto& tableFunctionCall = op->cast(); std::vector columnSkips; + auto expressionInUseByName = [](const auto& expressionsInUse, const auto& column) { + return std::any_of(expressionsInUse.begin(), expressionsInUse.end(), + [&](const auto& expr) { return expr->getUniqueName() == column->getUniqueName(); }); + }; for (auto& column : tableFunctionCall.getBindData()->columns) { // Check both variablesInUse and propertiesInUse since foreign table columns // may be referenced as properties in the query (e.g., a.id) but represented // as variables in the table function bind data - columnSkips.push_back( - !variablesInUse.contains(column) && !propertiesInUse.contains(column)); + const auto inUse = variablesInUse.contains(column) || propertiesInUse.contains(column) || + expressionInUseByName(variablesInUse, column) || + expressionInUseByName(propertiesInUse, column); + columnSkips.push_back(!inUse); } tableFunctionCall.setColumnSkips(std::move(columnSkips)); } diff --git a/src/processor/operator/simple/attach_database.cpp b/src/processor/operator/simple/attach_database.cpp index 7cd14fb772..2e8dc217dc 100644 --- a/src/processor/operator/simple/attach_database.cpp +++ b/src/processor/operator/simple/attach_database.cpp @@ -51,12 +51,11 @@ void AttachDatabase::executeInternal(ExecutionContext* context) { } auto errMsg = std::format("No loaded extension can handle database type: {}.", attachInfo.dbType); - if (attachInfo.dbType == "duckdb") { - errMsg += "\nDid you forget to load duckdb extension?\nYou can load it by: load " - "extension duckdb;"; - } else if (attachInfo.dbType == "postgres") { - errMsg += "\nDid you forget to load postgres extension?\nYou can load it by: load " - "extension postgres;"; + auto dbType = common::StringUtils::getLower(attachInfo.dbType); + if (dbType == "duckdb" || dbType == "postgres" || dbType == "sqlite") { + errMsg += std::format("\nDid you forget to load {} extension?\nYou can load it by: load " + "extension {};", + dbType, dbType); } throw common::RuntimeException{errMsg}; } diff --git a/test/test_files/ddl/ddl_empty.test b/test/test_files/ddl/ddl_empty.test index 665afd9ae0..e5ff50eaba 100644 --- a/test/test_files/ddl/ddl_empty.test +++ b/test/test_files/ddl/ddl_empty.test @@ -13,6 +13,12 @@ Index idx_default_person_pk has been created. -STATEMENT CALL SHOW_INDEXES() RETURN table_name, index_name, property_names; ---- 1 idx_default_person|idx_default_person_pk|[ID] +-STATEMENT CALL SHOW_INDEXES() WHERE table_name = 'idx_default_person' RETURN table_name, index_name, property_names; +---- 1 +idx_default_person|idx_default_person_pk|[ID] +-STATEMENT CALL SHOW_INDEXES() WHERE table_name = 'missing_table' RETURN count(*); +---- 1 +0 -STATEMENT CREATE INDEX idx_default_person_pk IF NOT EXISTS FOR (p:idx_default_person) ON (p.ID); ---- 1 Index idx_default_person_pk already exists.