[SPARK-56743][SPARK-56773][SQL][CORE][TESTS] Exercise writer-stage retries in DSv2 DML metric tests and fix injection-state cleanup under AQE#56597
Conversation
…ric tests; add AQE injection coverage Follow-up to SPARK-56743 (the INJECT_SHUFFLE_FETCH_FAILURES injection infra and SQLLastAttemptMetric for DSv2 DML metrics). Makes the DSv2 MERGE/UPDATE retry tests actually trigger a writer-stage retry, fixes a DAGScheduler bug that prevented that under AQE, and closes the test gap that let the bug through. DAGScheduler: stop evicting the test-only injection state per-stage. cleanupStateForJobAndIndependentStages removed the per-shuffle injection bookkeeping (the three injectShuffleFetchFailures* maps) whenever a stage was removed. Under AQE each Exchange is materialized as its own map-stage job, so that cleanup ran between the producer job and the consumer job and dropped the pending deferred corruption before the consumer was ever submitted - no FetchFailed, no retry. The maps are keyed by the globally-unique (never-reused) shuffleId, only allocated under Utils.isTesting, and bounded by the number of shuffles in a test SparkContext, so they are retained for its lifetime (restoring the original body of that cleanup loop). MetricsFailureInjectionSuite: add AQE coverage. The existing INJECT_SHUFFLE_FETCH_FAILURES tests all run with AQE disabled (the suite mixes in DisableAdaptiveExecutionSuite), so none exercised AQE's per-shuffle materialization - the exact path where the eviction above suppressed the retry. New test "Three stage metrics block failure injection with AQE" runs the same 3-stage query with ADAPTIVE_EXECUTION_ENABLED=true and asserts the non-leaf stage's raw counter overcounts (a retry actually fired) while SLAM stays stable. It fails on the pre-fix code and passes after. DSv2 MERGE/UPDATE retry tests: the "metric values are stable across stage retries" tests now run under the injection and exercise a real retry. For the metadata MERGE variants - where the writer's RequiresDistributionAndOrdering forces a re-shuffle between MergeRowsExec and the writer - MergeRowsExec sits in a non-leaf shuffle map stage, re-runs under the injection, and its raw per-row counters overcount while the SLAM-aware MergeSummary stays correct; the test asserts both. The noMetadata variants skip the overcount assertion (there MergeRowsExec is in the result stage and cannot be re-run by an upstream injection). UPDATE writer-side metrics live on the result stage and single-count by design, so that test is regression coverage that retries don't break the SLAM-aware UpdateSummary. Adds a noMetadata accessor on RowLevelOperationSuiteBase so MERGE variants can branch on whether the writer requires a re-shuffle. Co-authored-by: Isaac
Instead of leaving the injectShuffleFetchFailures* entries for the test SparkContext's lifetime, evict each shuffle's entry at the correct lifecycle point: when its map outputs are unregistered. A CleanerListener whose shuffleCleaned drops the entry from all three maps is attached lazily from the test-gated injection path (sc.cleaner is created after the DAGScheduler, so it can't be attached in the constructor; the attach point is only reached under Utils.isTesting + INJECT_SHUFFLE_FETCH_FAILURES, so it never runs in production). This is still not done on stage removal (cleanupStateForJobAndIndependent- Stages): under AQE each Exchange is materialized as its own map-stage job whose stage is removed before the consuming stage runs, so a stage-removal eviction would drop a pending corruption before its consumer is submitted. Co-authored-by: Isaac
9bc7c8a to
c522bcb
Compare
|
@cloud-fan here's a PR to use the infra from #55738 for additional metrics tests, and also a small fix to the infra - it did not work in AQE, because AQE submits each stage separately. |
cloud-fan
left a comment
There was a problem hiding this comment.
0 blocking, 2 non-blocking, 0 nits. Clean test-hardening PR; the test-only DAGScheduler fix is correct and properly gated. Two notes on making the strengthened tests fail-loud.
Correctness (1)
MetricsFailureInjectionSuite.scala:424: AQE test drops 4 assertions its non-AQE sibling has, despite the "same test" comment — see inline
Suggestions (1)
UpdateTableSuiteBase.scala:345: UPDATE / noMetadata-MERGE tests don't independently verify a retry fired — see inline
| } | ||
|
|
||
| test("Three stage metrics block failure injection with AQE") { | ||
| // Same as the previous test but with AQE enabled. Under AQE each Exchange is materialized |
There was a problem hiding this comment.
This test drops four assertions its non-AQE sibling has — stage3Metric.value === 5 and the three lastAttemptValueForDataset(finalDf) checks — yet the comment calls it the same test. If they don't hold under AQE (e.g. dataset lookup through AdaptiveSparkPlanExec), say so here; otherwise restore them so the AQE path gets equal coverage.
There was a problem hiding this comment.
Told my agent to stop being sloppy and readd the assertion :-).
| // test passes equally well with plain SQLMetric — it only exercises the SLAM-aware | ||
| // read path. Follow-up #55738 will add infra to actually retry the writer stage and | ||
| // exercise the SLAM behavior end-to-end for UPDATE. | ||
| // INJECT_SHUFFLE_FETCH_FAILURES corrupts the partition-0 task of the first successful |
There was a problem hiding this comment.
Nothing here asserts a retry actually fired. With the writer single-counting by design, the test passes even if the injection silently stops retrying — the vacuous-pass gap this PR closes for metadata MERGE. Consider asserting an upstream raw-metric overcount, or note that the new infra-level AQE test is what guards retry-fires. (Same for the MERGE if (!noMetadata) path.)
There was a problem hiding this comment.
yes, unfortunately it's still impossible to assert for overcounts in result stage metrics, because Spark does not support any retries in this stage. But the infra PR at least added more "interesting" scenarios of restarts, and it added coverage for Merge with metadata.
The new infra-level AQE tests checks that retry fires - it asserts overcount in stage1 and stage2, but not stage3 (again, because there can't be restarts in result stage).
…omments Review on apache#56597: - Restore the four assertions the AQE injection test dropped relative to its non-AQE sibling (stage3Metric === 5 and the three lastAttemptValueForDataset(finalDf) checks); verified they hold under AQE, so the AQE path now gets equal coverage. - Note in the MERGE/UPDATE retry tests that the metadata-MERGE overcount assertion doubles as a direct retry check, and that the noMetadata-MERGE / UPDATE result-stage paths have no overcounting metric to assert. Co-authored-by: Isaac
cloud-fan
left a comment
There was a problem hiding this comment.
2 addressed, 0 remaining, 1 new. (1 new = 0 newly introduced, 1 late catch — my own miss from the prior round.)
0 blocking, 0 non-blocking, 1 nit.
Both prior notes are resolved: the AQE test's dropped assertions are restored (now matching the non-AQE sibling exactly), and the MERGE metadata variants directly assert the raw MergeRowsExec accumulator overcounts. The test-only DAGScheduler fix is correct and properly production-gated.
Nits: 1 minor item (see inline comment).
…MetricsFailureInjectionSuite.scala Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
|
thanks, merging to master/4.x |
…ries in DSv2 DML metric tests and fix injection-state cleanup under AQE ### What changes were proposed in this pull request? Follow-up to [SPARK-56743](https://issues.apache.org/jira/browse/SPARK-56743) (SQLLastAttemptMetric for DSv2 DML metrics) and [SPARK-56773](https://issues.apache.org/jira/browse/SPARK-56773) (the `INJECT_SHUFFLE_FETCH_FAILURES` injection knobs). It makes the DSv2 MERGE/UPDATE retry tests actually trigger a writer-stage retry, fixes a `DAGScheduler` bug that prevented that under AQE, and closes the test-coverage gap that let the bug through. Four parts: **1. DAGScheduler: stop evicting the test-injection state on stage removal.** `cleanupStateForJobAndIndependentStages` removed the per-shuffle injection bookkeeping (the three `injectShuffleFetchFailures*` maps) whenever a stage was removed. Under AQE each `Exchange` is materialized as its *own* map-stage job, so that cleanup ran *between* the producer job and the consumer job and dropped the pending deferred corruption before the consumer was ever submitted - no `FetchFailed`, no retry. **2. DAGScheduler: evict that state at the correct lifecycle point instead.** The injection state mirrors a shuffle's `MapStatus`es, which live until the shuffle's map outputs are unregistered. A `CleanerListener` whose `shuffleCleaned` drops the shuffle's entry from all three maps is attached lazily from the test-gated injection path (`sc.cleaner` is created after the `DAGScheduler`, so it cannot be attached in the constructor; the attach point is only reached under `Utils.isTesting` + `INJECT_SHUFFLE_FETCH_FAILURES`, so it never runs in production). **3. `MetricsFailureInjectionSuite`: add AQE coverage.** The existing `INJECT_SHUFFLE_FETCH_FAILURES` tests all run with AQE disabled (the suite mixes in `DisableAdaptiveExecutionSuite`), so none exercised AQE's per-shuffle-materialization path - the exact path where the eviction in (1) suppressed the retry. New test `Three stage metrics block failure injection with AQE` runs the same 3-stage query with `ADAPTIVE_EXECUTION_ENABLED=true` and asserts the non-leaf stage's raw counter overcounts (a retry actually fired) while SLAM stays stable. It fails on the pre-fix code and passes after. **4. DSv2 MERGE/UPDATE retry tests** (`MergeIntoTableSuiteBase`, `UpdateTableSuiteBase`): the `"metric values are stable across stage retries"` tests now run under the injection and exercise a real retry. For the metadata MERGE variants - where the writer's `RequiresDistributionAndOrdering` forces a re-shuffle between `MergeRowsExec` and the writer - `MergeRowsExec` sits in a non-leaf shuffle map stage, re-runs under the injection, and its raw per-row counters overcount, while the SLAM-aware `MergeSummary` stays correct; the test asserts both. The `noMetadata` variants skip the overcount assertion (there `MergeRowsExec` is in the result stage and cannot be re-run by an upstream injection). UPDATE writer-side metrics live on the result stage and single-count by design (`ResultStage.findMissingPartitions` only re-runs not-yet-completed partitions), so that test is regression coverage that retries don't break the SLAM-aware `UpdateSummary`. A `noMetadata` accessor is added on `RowLevelOperationSuiteBase` so MERGE variants can branch on whether the writer requires a re-shuffle. ### Why are the changes needed? The DSv2 DML retry tests added in SPARK-56743 only verified that SLAM values stay correct *given* retries happen - which is vacuously true even when no retry fires. With the merged injection infra they did not actually trigger a writer-stage retry under AQE, because the per-stage eviction dropped the deferred corruption between AQE's per-shuffle jobs. This PR makes the tests demand a real retry (raw-metric overcount), fixes the infra so that retry actually happens under AQE, and adds an infra-level AQE test so the regression is caught directly in `MetricsFailureInjectionSuite` rather than only end-to-end. ### Does this PR introduce _any_ user-facing change? No. The `DAGScheduler` change only affects test-only state and a test-only `CleanerListener`, both reached only under `Utils.isTesting`; the rest is test code. ### How was this patch tested? - New + existing `MetricsFailureInjectionSuite` (13 tests, incl. the new AQE test) pass; the new AQE test was confirmed to fail on the pre-fix code (eviction on stage removal) and pass after. - `SQLLastAttemptMetricIntegrationSuite` (+ `WithStageRetries` / `WithChecksumMismatch`) and `SQLLastAttemptMetricPlanShapesSuite` still pass (258 tests) - no regression from the `DAGScheduler` change. - All 4 MERGE and 4 UPDATE row-level-operation variants pass; metadata MERGE genuinely overcounts the raw `MergeRowsExec` accumulator (`numTargetRowsUpdated=6`) while `MergeSummary` reports 2. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code, Opus 4.8. Closes #56597 from juliuszsompolski/SPARK-56743-extratests. Authored-by: Juliusz Sompolski <julek@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 2eb2ac2) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Follow-up to SPARK-56743 (SQLLastAttemptMetric
for DSv2 DML metrics) and SPARK-56773 (the
INJECT_SHUFFLE_FETCH_FAILURESinjection knobs). It makes the DSv2 MERGE/UPDATE retry testsactually trigger a writer-stage retry, fixes a
DAGSchedulerbug that prevented that under AQE, andcloses the test-coverage gap that let the bug through.
Four parts:
1. DAGScheduler: stop evicting the test-injection state on stage removal.
cleanupStateForJobAndIndependentStagesremoved the per-shuffle injection bookkeeping (the threeinjectShuffleFetchFailures*maps) whenever a stage was removed. Under AQE eachExchangeismaterialized as its own map-stage job, so that cleanup ran between the producer job and the
consumer job and dropped the pending deferred corruption before the consumer was ever submitted -
no
FetchFailed, no retry.2. DAGScheduler: evict that state at the correct lifecycle point instead. The injection state
mirrors a shuffle's
MapStatuses, which live until the shuffle's map outputs are unregistered. ACleanerListenerwhoseshuffleCleaneddrops the shuffle's entry from all three maps is attachedlazily from the test-gated injection path (
sc.cleaneris created after theDAGScheduler, so itcannot be attached in the constructor; the attach point is only reached under
Utils.isTesting+INJECT_SHUFFLE_FETCH_FAILURES, so it never runs in production).3.
MetricsFailureInjectionSuite: add AQE coverage. The existingINJECT_SHUFFLE_FETCH_FAILUREStests all run with AQE disabled (the suite mixes inDisableAdaptiveExecutionSuite), so none exercised AQE's per-shuffle-materialization path - theexact path where the eviction in (1) suppressed the retry. New test
Three stage metrics block failure injection with AQEruns the same 3-stage query withADAPTIVE_EXECUTION_ENABLED=trueand asserts the non-leaf stage's raw counter overcounts (a retryactually fired) while SLAM stays stable. It fails on the pre-fix code and passes after.
4. DSv2 MERGE/UPDATE retry tests (
MergeIntoTableSuiteBase,UpdateTableSuiteBase): the"metric values are stable across stage retries"tests now run under the injection and exercise areal retry. For the metadata MERGE variants - where the writer's
RequiresDistributionAndOrderingforces a re-shuffle between
MergeRowsExecand the writer -MergeRowsExecsits in a non-leafshuffle map stage, re-runs under the injection, and its raw per-row counters overcount, while the
SLAM-aware
MergeSummarystays correct; the test asserts both. ThenoMetadatavariants skip theovercount assertion (there
MergeRowsExecis in the result stage and cannot be re-run by anupstream injection). UPDATE writer-side metrics live on the result stage and single-count by
design (
ResultStage.findMissingPartitionsonly re-runs not-yet-completed partitions), so thattest is regression coverage that retries don't break the SLAM-aware
UpdateSummary. AnoMetadataaccessor is added on
RowLevelOperationSuiteBaseso MERGE variants can branch on whether thewriter requires a re-shuffle.
Why are the changes needed?
The DSv2 DML retry tests added in SPARK-56743 only verified that SLAM values stay correct given
retries happen - which is vacuously true even when no retry fires. With the merged injection infra
they did not actually trigger a writer-stage retry under AQE, because the per-stage eviction
dropped the deferred corruption between AQE's per-shuffle jobs. This PR makes the tests demand a
real retry (raw-metric overcount), fixes the infra so that retry actually happens under AQE, and
adds an infra-level AQE test so the regression is caught directly in
MetricsFailureInjectionSuiterather than only end-to-end.
Does this PR introduce any user-facing change?
No. The
DAGSchedulerchange only affects test-only state and a test-onlyCleanerListener, bothreached only under
Utils.isTesting; the rest is test code.How was this patch tested?
MetricsFailureInjectionSuite(13 tests, incl. the new AQE test) pass; the newAQE test was confirmed to fail on the pre-fix code (eviction on stage removal) and pass after.
SQLLastAttemptMetricIntegrationSuite(+WithStageRetries/WithChecksumMismatch) andSQLLastAttemptMetricPlanShapesSuitestill pass (258 tests) - no regression from theDAGSchedulerchange.the raw
MergeRowsExecaccumulator (numTargetRowsUpdated=6) whileMergeSummaryreports 2.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code, Opus 4.8.