refactor: remove Java-side dataset cache, rely on Rust-side Session#353
Conversation
hamersaw
left a comment
There was a problem hiding this comment.
Do you think it would make sense to just remove the LanceDatasetCache and house fragment caching in the LanceRuntime? It feels like there's just multiple layers of cache here.
625df7c followed this approach |
|
I think overall this looks reasonable, but we need some relatively comprehensive benchmarks to support any performance implications. |
…ches-335 # Conflicts: # lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceDatasetCache.java # lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java
Replace non-existent LanceNamespaceStorageOptionsProvider with LanceNamespace from getOrCreateNamespace(), matching the namespace API used by OpenDatasetBuilder. Add missing java.util.List import.
Adds FragmentLoadingBenchmarkTest to quantify the performance difference between getFragments() (old eager approach, O(N)) and getFragment(id) (new lazy approach, O(1)). Results on datasets with 10-1000 fragments show 10x-609x speedup for the lazy approach, confirming the motivation for PR lance-format#353. Tagged with @tag("benchmark") to exclude from normal test runs.
Add test.excludedGroups property (default: benchmark) to surefire config so @tag("benchmark") tests are excluded from normal mvn test runs. Override with -Dtest.excludedGroups= to include them. Run benchmarks with: mvn test -Dtest=FragmentLoadingBenchmarkTest \ -Dtest.excludedGroups= -Dgroups=benchmark
| * <p>Tagged with "benchmark" so it is excluded from normal test runs. | ||
| */ | ||
| @Tag("benchmark") | ||
| public class FragmentLoadingBenchmarkTest { |
There was a problem hiding this comment.
Is this microbenchmark sufficient to demonstrate the effect? We can run
mvn test -pl lance-spark-base_2.12 \
-Dtest=FragmentLoadingBenchmarkTest \
-Dtest.excludedGroups= \
-Dgroups=benchmark
to verify:
=== Fragment Loading Benchmark ===
Fragments | getFragments() (ms) | getFragment(id) (ms) | Speedup
----------------------------------------------------------------------
10 | 0.082 ms | 0.007 ms | 11.0x
50 | 0.329 ms | 0.008 ms | 43.2x
100 | 0.630 ms | 0.010 ms | 65.9x
500 | 2.929 ms | 0.008 ms | 380.9x
1000 | 6.064 ms | 0.008 ms | 760.4x
Notes:
- getFragments(): loads ALL fragment metadata (old eager approach)
- getFragment(id): loads ONE fragment by ID (new lazy approach)
- Each worker partition only needs one fragment, so the lazy approach avoids
loading metadata for all other fragments in the dataset.
The project does not integrate JMH yet, so this benchmark is relatively simple.
There was a problem hiding this comment.
If end-to-end impact is needed, we are running tests on a 1TB TPC-DS dataset, but this improvement likely won’t show a noticeable difference there.
There was a problem hiding this comment.
@summaryzb Do you have any test results about this ?
BatchScanExec.equals() compares batch objects via equals(), which delegates to LanceScan since it implements Batch. Without overriding equals/hashCode, Object identity is used, so two scans of the same table are never equal and Spark cannot reuse exchanges. Compare schema, readOptions, filters, limit, offset, topN, and aggregation. Exclude scanId (per-instance UUID for tracing only).
This reverts commit a34190c.
|
Ok, so diving a little deeper. The rust-side implementation has it's own caches; for metadata (LANCE_METADATA_CACHE_SIZE) and indexes (LANCE_INDEX_CACHE_SIZE). By caching the actual dataset handle, we're really only saving time in deserializing the dataset manifest bytes, which should be measureable in microseconds (or milliseconds at worst). I'm wondering if we should just remove the notion of Spark-side caches for everything (dataset + fragment) and rely on the rust-side Lance cache. This is basically what the |
|
That makes sense. Since the Session already caches metadata and index blocks on the Rust side, the Java-side
I'll also enhance Do you think this is reasonable? |
I'm not following exactly where these are going to be inserted. This gets a little bit tricky because these a global session configuration options and may not be reasonably applied to a single dataset? Maybe it's worth hacking together a proposal and we can iterate? |
|
@hamersaw I have refactored the code, removed |
| readOptions, | ||
| fragmentId, | ||
| dataset = | ||
| LanceRuntime.openDataset( |
There was a problem hiding this comment.
Let me check if we can use the utility methods from Utils here.
| inputPartition.getInitialStorageOptions(), | ||
| inputPartition.getNamespaceImpl(), | ||
| inputPartition.getNamespaceProperties()); | ||
| dataset = |
There was a problem hiding this comment.
@hamersaw On your earlier concern about blockSize/indexCacheSize/metadataCacheSize: this pr doesn't add new API for them now, but switching to Utils.openDatasetBuilder does mean the fragment-scan path now passes these through (previously they were dropped by LanceDatasetCache). This matches what LanceCountStarPartitionReader and ~27 other call sites already do. WDYT?
The refactor to remove LanceDatasetCache dropped namespace reconstruction on executors. Vended credentials (STS tokens) need the namespace client to refresh during long-running scans. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
hamersaw
left a comment
There was a problem hiding this comment.
Thanks, I think this looks great! I added the namespace build back in so that we can ensure credential refresh on long-running operations.
|
Thank you @hamersaw |
…ead options not applied (#476) ## Summary Fixes `org.apache.thrift.transport.TTransportException: GSS initiate failed` thrown on Spark executors when reading Lance tables registered in a Kerberized Hive Metastore (both plain `SELECT` and SQL DML). This PR does two things: 1. **Stops executors from rebuilding the namespace client unconditionally.** Adds a new read option `executor_credential_refresh` (default `true`, preserving current behavior). When set to `false`, executors skip the eager `namespace.describeTable()` RPC and open the dataset directly by URI using the storage options the driver already obtained. 2. **Makes catalog-level read options actually reach the typed fields.** Catalog-level conf (`--conf spark.sql.catalog.<name>.executor_credential_refresh=false`) is now parsed in `withCatalogDefaults()`, so `spark.sql(...)` queries (including `SELECT` and SQL DML) — which have no `spark.read.option(...)` attach point — pick up the flag the same way as DataFrameReader-based reads. ## Root Cause Since #353 removed `LanceDatasetCache`, `LanceFragmentScanner.create()` unconditionally rebuilds the `LanceNamespace` client on each Spark executor and binds it back onto `LanceSparkReadOptions`. This forces the dataset open through `Utils.OpenDatasetBuilder`'s `namespaceClient` branch, which in turn calls `OpenDatasetBuilder.buildFromNamespaceClient()` in the Lance Java SDK — and that path issues an eager `namespace.describeTable()` RPC before handing off to Rust. For catalogs where the backing service authenticates per-call (HMS over Kerberos, some REST catalogs), Spark executors typically do not have a Kerberos TGT — the `--keytab` / `--principal` credentials only reach the driver / ApplicationMaster, while executors run with Hadoop delegation tokens that cannot be used for HMS Thrift SASL. The `describeTable` RPC therefore fails with: ``` org.apache.thrift.transport.TTransportException: GSS initiate failed at org.lance.namespace.hive2.Hive2ClientPool.newClient(Hive2ClientPool.java:42) at org.lance.namespace.hive2.Hive2Namespace.describeTable(Hive2Namespace.java:285) at org.lance.OpenDatasetBuilder.buildFromNamespaceClient(OpenDatasetBuilder.java:205) at org.lance.OpenDatasetBuilder.build(OpenDatasetBuilder.java:191) at org.lance.spark.utils.Utils$OpenDatasetBuilder.build(Utils.java:140) at org.lance.spark.internal.LanceFragmentScanner.create(LanceFragmentScanner.java:67) ``` Driver-side operations (metadata-only queries, count-via-manifest) succeed because the driver has the TGT. The failure only manifests during fragment scans. ## Why the Existing Behavior Exists Rebuilding the namespace client on the executor is not dead code — it keeps the Rust `LanceNamespaceStorageOptionsProvider` attached so that short-lived vended credentials (STS tokens for S3 / GCS / Azure) returned by `describeTable()` can be refreshed when they expire mid-scan. Simply removing the rebuild would break long-running scans against object stores that use credential vending. ## Fix ### 1. Gate the executor-side rebuild behind a new option Add a boolean read option `executor_credential_refresh`, defaulting to `true`: - `true` (default): unchanged — executor rebuilds the namespace client and routes through the `namespaceClient` branch, preserving credential refresh. Safe for all existing users. - `false`: executor skips the rebuild, reads remain open via URI using the `initialStorageOptions` the driver already obtained from `describeTable()` at scan-plan time. ### 2. Make catalog-level conf actually reach the typed field Before this PR, `Builder.withCatalogDefaults(catalogConfig)` only merged the storage-options map and never parsed typed flags. As a result, the catalog-conf syntax looked like it should work but silently ignored the flag. This PR extracts a `parseTypedFlags(Map<String, String>)` helper and calls it from both `fromOptions()` and `withCatalogDefaults()`, so every recognized read option (not just `executor_credential_refresh`) now flows from catalog conf into the typed field. This is what makes the fix usable from SQL DML. Without the `withCatalogDefaults` parse, a user running `DELETE FROM kerberized_hms_lance_table WHERE id = 1` has no way to disable the rebuild — SQL DML has no per-statement `.option(...)` attach point. ### Configuration surfaces after this PR | Surface | Example | Works? | |---|---|---| | Per-read option | `spark.read.option("executor_credential_refresh", "false").table(...)` (DataFrameReader) | Yes (already worked before this PR; not available for `spark.sql("SELECT ...")`) | | Catalog conf + plain SELECT | `--conf spark.sql.catalog.lance.executor_credential_refresh=false` + `spark.sql("SELECT * FROM lance.db.t")` | Yes (fixed by this PR) | | Catalog conf + SQL DML | `--conf spark.sql.catalog.lance.executor_credential_refresh=false` + `spark.sql("DELETE FROM lance.db.t WHERE id=1")` | Yes (fixed by this PR) | Intended usage for HMS + Kerberos deployments: ``` spark-submit ... \ --keytab /etc/keytabs/my.keytab \ --principal my/principal@REALM \ --conf spark.sql.catalog.lance.executor_credential_refresh=false ``` ## Per-Namespace Trade-off Analysis The refresh callback is meaningful only for namespaces that actually return `storage_options` from `describeTable()`. Survey of the impls in `lance-namespace-impls`: | Namespace | `describeTable()` populates `storage_options`? | Cost of `executor_credential_refresh=false` | |----------------------------|------------------------------------------------|---------------------------------------------------------------------------------| | `Hive2Namespace` | No — `setLocation` only | **None.** The refresh callback is a no-op for HMS regardless of underlying storage. | | `Hive3Namespace` | No — `setLocation` only | **None.** Same as Hive2. | | `GlueNamespace` | Static `config.getStorageOptions()` | Effectively none for plain Glue. Use the default if you rely on LakeFormation vended creds. | | `IcebergNamespace` (REST) | Yes — vended creds typical | Long scans against vended creds will fail when the credential expires. | | `PolarisNamespace` | Yes — vended creds typical | Same as Iceberg REST. | | `UnityNamespace` | Yes — Databricks-vended temp creds | Same as Iceberg REST. | Concretely, for the HMS + S3 case: HMS does **not** vend S3 credentials (`describeTable()` only sets `location`), so the executor's S3 access is governed entirely by the AWS SDK credential chain (instance profile / `hive-site.xml` / env vars / `~/.aws/credentials`) and the AWS SDK handles all STS rotation independently. The Lance refresh callback would have nothing to refresh, so disabling it costs nothing in practice. ## Scope of Change - `LanceSparkReadOptions.java`: - New constant `CONFIG_EXECUTOR_CREDENTIAL_REFRESH`, new field `executorCredentialRefresh` (default `true`), builder / getter / `withVersion` propagation / `equals` / `hashCode`, Javadoc covering per-namespace trade-off. - Extracted `Builder.parseTypedFlags(Map<String, String>)` helper from the previously duplicated `fromOptions` body, now called from both `fromOptions()` and `withCatalogDefaults()`. This incidentally also fixes silent ignores of `push_down_filters`, `batch_size`, `topN_push_down`, etc. when set at the catalog level — a pre-existing latent issue uncovered while fixing the primary bug. - `LanceFragmentScanner.java`: add `&& readOptions.isExecutorCredentialRefresh()` to the existing rebuild `if`, inline comment explaining the trade-off. - `LanceSparkReadOptionsSerializationTest.java`: six new tests covering default value, map parsing, serialization round-trip, `withVersion` propagation, catalog-defaults path, and per-read override precedence. No public API signature is changed; no existing behavior is altered for users who do not set the new option. ## Test Plan New unit tests (all 305 tests in `lance-spark-base_2.12` pass locally): - `testExecutorCredentialRefreshDefaultsToTrue` — default value preserved. - `testExecutorCredentialRefreshParsedFromOptions` — flag honored from both `"true"` and `"false"` map entries. - `testExecutorCredentialRefreshSurvivesSerialization` — flag survives Java serialization (critical: it must reach the executor). - `testExecutorCredentialRefreshPreservedByWithVersion` — flag propagated by `withVersion()` used during scan-plan version pinning. - `testExecutorCredentialRefreshFromCatalogDefaults` — new; guards the catalog-conf path used by SQL DML. - `testPerReadOptionOverridesCatalogDefaults` — new; pins the precedence rule "per-read `.option(...)` wins over catalog default". - `LanceFragmentScannerTest.java`: one new end-to-end test that locks in the executor-branch contract through `LanceFragmentScanner.create` (per reviewer suggestion). Integration test (out-of-band, on internal YARN + HMS + Kerberos + HDFS cluster): 1. `spark-submit` with `--keytab` / `--principal`, Kerberized HMS, `SELECT * FROM lance_hms_table` via `lance-namespace-hive2`. 2. **Before fix**: executor task fails with `GSS initiate failed` on `describeTable`. Reproducible across multiple partitions / runs. 3. **After fix** with `--conf spark.sql.catalog.lance.executor_credential_refresh=false`: scan completes, returns expected row count and sample rows. 4. Default path (`true`) with the same fix jar: behavior unchanged. ## Backward Compatibility Default is `true`, so every existing job behaves identically without touching configs. Only users who explicitly set the new option to `false` opt into the new path. Co-authored-by: xiaguanglei <xiaguanglei@qiyi.com>
|
@hamersaw — following up on this. Removing the Java-side dataset cache to rely on Rust-side Repro on a 34 GiB MinIO TPC-DS
Without the fix, every JMH iteration pays the full cold open cost (q1-lance 12.7 s ± 146 ms, CV ≈ 1% — i.e. all 10 iterations identical because each one rebuilds the credential chain + HTTP client). Fix is in lance-format/lance#6839: per-
Standalone |
Summary
LanceDatasetCache; the Rust-sideSessionalready caches metadata and index blocks, so a Java-sideDatasetcache only saves manifest deserialization (microseconds).dataset.getFragment(id)per partition, instead of eagerly buildingMap<Integer, Fragment>fromdataset.getFragments()on every cache miss — O(N) in total fragment count, even though each partition reads one fragment.LanceFragmentScanneropens, owns, and closes its ownDatasetthrough a newLanceRuntime.openDataset()helper.Changes
LanceDatasetCache.javaLanceRuntime.javaopenDataset()— wires the catalogSession, reconstructs the namespace from the(namespaceImpl, namespaceProperties)strings carried inLanceInputPartition, merges storage options, and pins the version.LanceFragmentScanner.javacreate()opens its ownDatasetand loads only the target fragment;close()closes both scanner and dataset, usingThrowable.addSuppressedso the first failure isn't masked by cleanup.Before / after
The per-catalog Rust
Session(configured viaLANCE_INDEX_CACHE_SIZE/LANCE_METADATA_CACHE_SIZE) and the global ArrowBufferAllocatorare unchanged.