Skip to content

[Improvement]: Refactor snapshot-expiring via ProcessFactory plugin#4107

Merged
baiyangtx merged 9 commits into
apache:masterfrom
baiyangtx:upstream/SnapshotsExpiring-processFactory
Mar 17, 2026
Merged

[Improvement]: Refactor snapshot-expiring via ProcessFactory plugin#4107
baiyangtx merged 9 commits into
apache:masterfrom
baiyangtx:upstream/SnapshotsExpiring-processFactory

Conversation

@baiyangtx

@baiyangtx baiyangtx commented Mar 5, 2026

Copy link
Copy Markdown
Contributor

Why are the changes needed?

Refactor snapshot expiring action via ProcessFactory plugin

Brief change log

  • Refactor Iceberg snapshot expiration from the inline scheduler into a pluggable process model (ProcessFactory + ExecuteEngine).
  • Introduce IcebergProcessFactory and SnapshotsExpiringProcess, executed by the default LocalExecutionEngine with async execution, status tracking and cancellation.
  • Externalize enablement/interval knobs via plugin YAML and new config options (e.g. expire-snapshots.enabled, expire-snapshots.interval).
  • Add unit tests for IcebergProcessFactory and LocalExecutionEngine, and update the “last snapshots expiring” timestamp only after the actual expiration run finishes to preserve correct interval semantics.

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible

  • Add screenshots for manual tests if appropriate

  • Run test locally before making a pull request

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@czy006 czy006 self-requested a review March 12, 2026 13:42
@baiyangtx baiyangtx force-pushed the upstream/SnapshotsExpiring-processFactory branch from 709fc06 to b5e817f Compare March 12, 2026 13:50
@baiyangtx baiyangtx marked this pull request as ready for review March 12, 2026 13:51
ConfigOptions.key("expire-snapshots.enabled").booleanType().defaultValue(true);

public static final ConfigOption<Duration> SNAPSHOT_EXPIRE_INTERVAL =
ConfigOptions.key("expire-snapshot.interval")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YAML is expire-snapshots.interval

properties.keySet().stream()
.filter(key -> key.startsWith(POOL_CONFIG_PREFIX))
.map(key -> key.substring(POOL_CONFIG_PREFIX.length()))
.map(key -> key.substring(0, key.indexOf(".") + 1))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last result is pool.default..thread-count / pool.snapshots-expiring..thread-count, that's not get the pool

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fix it and add some tests

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.amoro.process.ExecuteEngine

@Override
public void run() {
try {
AmoroTable<?> amoroTable = tableRuntime.loadTable();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the new scheduling path no longer preserves the old “run, then record cleanup time” behavior for snapshot expiration.

In the old implementation, SnapshotsExpiringExecutor.java executed tableMaintainer.expireSnapshots() synchronously. Only after that finished did PeriodicTableScheduler.java (line 125) update lastCleanTime and schedule the next run. So the interval was effectively measured from the end of the previous cleanup.

In the new path, ActionCoordinatorScheduler.java (line 103) only submits/registers a process and returns immediately. After that return, PeriodicTableScheduler still updates lastCleanTime right away, even though the real cleanup work has not finished yet. The actual cleanup now happens later in SnapshotsExpiringProcess.java (line 53).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Building on your observation — the async submission also introduces a state-loss issue in LocalExecutionEngine.getStatus().

getStatus() removes the Future from the map on terminal states (isDone/isCancelled), making it non-idempotent:

Call 1: future.isDone() == true → remove → SUCCESS
Call 2: future == null          → KILLED  (wrong!)

TableProcessExecutor polls getStatus() in a loop (line 107), so if any retry or concurrent access queries the same identifier twice after completion, it gets KILLED instead of the real result.

There's also a TOCTOU race between containsKey and get across cancelingInstances/activeInstances (lines 67-70), since the compound check-then-act isn't atomic even with ConcurrentHashMap.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The information of a completed process is not cleaned up immediately, but waits for a TTL period, which is long enough for the process executor to obtain its final status and persist the state into the database.

@czy006

czy006 commented Mar 13, 2026

Copy link
Copy Markdown
Contributor

It looks like IcebergProcessFactory receives available execute engines too early.

In AmoroServiceContainer, availableExecuteEngines(executeEngineManager.installedPlugins()) is called before executeEngineManager.initialize(), so installedPlugins() is still empty at that point. As a result, IcebergProcessFactory.localEngine is never set.

Later, when snapshot expiration is triggered, triggerExpireSnapshot() returns Optional.empty() because localEngine == null, so no SnapshotsExpiringProcess is ever created or submitted.

In other words, the new expire-snapshots path is effectively disabled due to initialization order. We probably need to initialize execute engines before injecting them into process factories, or re-inject them after engine initialization.

@baiyangtx baiyangtx force-pushed the upstream/SnapshotsExpiring-processFactory branch from 2381805 to d4fb5e7 Compare March 16, 2026 06:19
…actory

Co-Authored-By: Aime <aime@bytedance.com>
@baiyangtx baiyangtx force-pushed the upstream/SnapshotsExpiring-processFactory branch from d4fb5e7 to 1846d5b Compare March 16, 2026 06:22
zhangyongxiang.alpha and others added 8 commits March 16, 2026 15:26
新增 `IcebergProcessFactory` 与 `LocalExecutionEngine` 的单测覆盖。

- IcebergProcessFactory:覆盖 open/supportedActions、触发策略 interval、触发/不触发/禁用等分支
- LocalExecutionEngine:覆盖 tag 线程池选择、取消流程、失败状态、完成态 TTL 过期与非法 identifier

本地验证:
- mvn -pl amoro-common -am -Dtest=TestLocalExecutionEngine test
- mvn -pl amoro-ams -am -Dtest=TestIcebergProcessFactory test

Co-Authored-By: Aime <aime@bytedance.com>
Change-Id: I14dcd3c1286f2be72f8135777e1a81568d060b7d
修正新增单测文件被误标记为可执行(100755)的问题,避免在 MR 中产生无意义的 mode diff。

Co-Authored-By: Aime <aime@bytedance.com>
Change-Id: I83c82894a66514cabecff2751e22c1c418469ac5

@zhoujinsong zhoujinsong left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this is a clean refactor that migrates snapshot expiring from the inline executor into the ProcessFactory framework. A few minor notes:

  1. Duplicate interval check (IcebergProcessFactory.triggerExpireSnapshot): The time-since-last-execution guard is redundant since ActionCoordinatorScheduler already controls trigger frequency via ProcessTriggerStrategy.FIXED_RATE. Consider removing the manual check from the factory to avoid two sources of truth for scheduling state.

  2. retryNumber on recovery: processMeta.getRetryNumber() may deserialize as 0 if the field is not persisted, causing already-failed processes to reset their retry count on AMS restart. Worth verifying the DB mapping.

  3. Empty getSummary(): SnapshotsExpiringProcess.getSummary() returns an empty map, so nothing is recorded in the process store after execution. Even a minimal summary (e.g. elapsed time) would help with debugging.

None of these are blockers. LGTM.

@baiyangtx baiyangtx merged commit 0721df9 into apache:master Mar 17, 2026
6 checks passed
@baiyangtx baiyangtx deleted the upstream/SnapshotsExpiring-processFactory branch March 17, 2026 13:51
@xxubai xxubai added this to the Release 0.9.0 milestone Mar 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants