Flink: Maintenance - TriggerManager#10484
Conversation
|
Hi @stevenzwu, I would like to move forward with this PR.
WDYT @stevenzwu? Could we move forward along these lines, or do you have another proposal? Do you have any foundational concerns about the PR? Thanks, |
| interface Lock { | ||
| /** | ||
| * Tries to acquire a lock with a given key. Anyone already holding a lock would prevent | ||
| * acquiring this lock. Not reentrant. | ||
| * | ||
| * <p>Called by {@link TriggerManager}. Implementations could assume that are no concurrent | ||
| * calls for this method. | ||
| * | ||
| * @return <code>true</code> if the lock is acquired by this job, <code>false</code> if the lock | ||
| * is already held by someone | ||
| */ | ||
| boolean tryLock(); | ||
|
|
||
| /** | ||
| * Checks if the lock is already taken. | ||
| * | ||
| * @return <code>true</code> if the lock is held by someone | ||
| */ | ||
| boolean isHeld(); | ||
|
|
||
| // TODO: Fix the link to the LockRemover when we have a final name and implementation | ||
| /** | ||
| * Releases the lock. Should not fail if the lock is not held by anyone. | ||
| * | ||
| * <p>Called by LockRemover. Implementations could assume that are no concurrent calls for this | ||
| * method. | ||
| */ | ||
| void unlock(); | ||
| } |
There was a problem hiding this comment.
Why are creating new interfaces rather than enhancing the existing ones ?
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/LockManager.java
There was a problem hiding this comment.
Notice the different requirements for the methods. This lock is not reentrant, and the unlock removes locks held by others.
|
Merged to master. |
| throw new UncheckedInterruptedException(e, "Interrupted during unlock"); | ||
| } catch (UncheckedSQLException e) { | ||
| throw e; | ||
| } catch (SQLException e) { |
There was a problem hiding this comment.
With SQLException caught on line 257, it seems this clause is not reachable.
For instanceId(), SQLException is caught inside the method as well.
There was a problem hiding this comment.
The pool.run(..) method could throw SQLException. For example on reconnect.
There was a problem hiding this comment.
I looked at the reconnect method in core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java.
It seems SQLException is caught there as well.
BTW thanks for the quick response.
There was a problem hiding this comment.
The JdbcClientPool extends ClientPoolImpl<Connection, SQLException>. The ClientPoolImpl defines run like this:
@Override
public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
return run(action, retryByDefault);
}
So we either need to declare SQLException in throws, or we need to handle the exception.
| List<TriggerEvaluator> evaluators, | ||
| long minFireDelayMs, | ||
| long lockCheckDelayMs) { | ||
| Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); |
(cherry picked from commit b2cd6f3)
(cherry picked from commit f2d6275)
(cherry picked from commit ab0594b)
The responsibility of the Trigger Manager is to start the Maintenance Tasks based on the incoming Table Change messages and prevent overlapping Maintenance Task runs. The event time of the Trigger messages should be monotonically increasing, as this will be used as a watermark to separate Maintenance Task runs.
Implementation for #10301
This will be used for Maintenance Trigger Manager as described in the design doc. The Trigger Manager paragraph describes the requirements in more detail.