feat: OracleDB support for CDC via LogMiner#4036
feat: OracleDB support for CDC via LogMiner#4036josephwoodward wants to merge 27 commits intomainfrom
Conversation
|
Claude finished @josephwoodward's task in 4m 22s —— View job
Commits
Review Well-structured CDC implementation with good architectural separation (LogMiner engine, session manager, transaction buffer, SQL redo parser). The integration tests are thorough and follow project patterns (testcontainers-go,
|
|
42cd45d to
e5b3386
Compare
|
Claude finished @josephwoodward's task in 8m 22s —— View job
Commits
Review Well-structured CDC implementation with good architectural separation (LogMiner engine, session manager, transaction buffer, SQL redo parser). Component registration (
|
internal/plugins/info.csv
Outdated
| openai_transcription ,processor ,openai_transcription ,4.32.0 ,certified ,n ,y ,y | ||
| openai_translation ,processor ,openai_translation ,4.32.0 ,certified ,n ,y ,y | ||
| opensearch ,output ,OpenSearch ,0.0.0 ,certified ,n ,y ,y | ||
| oracledb_cdc ,input ,oracledb_cdc ,0.0.0 ,enterprise ,n ,y ,y |
There was a problem hiding this comment.
We need to set a version.
| lm.log.Infof("Caught up with redo logs, backing off") | ||
| } | ||
| time.Sleep(lm.cfg.MiningBackoffInterval) | ||
| } |
There was a problem hiding this comment.
Bug: time.Sleep blocks without checking context cancellation, so a shutdown signal arriving during the backoff interval is not noticed until the sleep completes. Since MiningBackoffInterval is user-configurable (and could be set to minutes), this delays graceful shutdown by up to that duration.
Per godev patterns — check for cancellation in long-running loops using a select on ctx.Done(). Replace the time.Sleep with a context-aware backoff:
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(lm.cfg.MiningBackoffInterval):
}This also lets you remove the select/default wrapper above (lines 128-131) since cancellation is now checked at the sleep point.
There was a problem hiding this comment.
This was intentional as it's a tight loop so can result in a lot of unnecessary allocations, will need to benchmark to understand impact vs the additional wait time.
|
|
||
| == Permissions | ||
|
|
||
| When using the default Oracle based cache, the Connect user requires permission to create tables and stored procedures, and the ` + "rpcn" + ` schema must already exist. Refer to ` + "`" + ociFieldCheckpointCacheTableName + "`" + ` for more information. |
There was a problem hiding this comment.
We probably should have more actionable information here if possible
| } else if policy.IsNoop() { | ||
| policy.Count = 1 | ||
| } |
There was a problem hiding this comment.
That is to say that Count must be > 0?
| // Reset our stop signal | ||
| i.stopSig = shutdown.NewSignaller() | ||
|
|
||
| go func() { |
There was a problem hiding this comment.
Consider extracting to function that can return error and do not do ad-hoc
i.stopSig.TriggerHasStopped()
return
There was a problem hiding this comment.
You could actually use the wg, _ := errgroup.WithContext(softCtx) defined below
| return | ||
| } | ||
|
|
||
| if err = i.cacheSCN(softCtx, maxSCN); err != nil { |
There was a problem hiding this comment.
All the error reporting would be easier in a method.
| } | ||
|
|
||
| var currentSCN SCN | ||
| if err := s.db.QueryRowContext(ctx, "SELECT CURRENT_SCN FROM V$DATABASE").Scan(¤tSCN); err != nil { |
There was a problem hiding this comment.
nit: I saw sometimes we would use ` not " for SQL queries
| } | ||
| var types []*sql.ColumnType | ||
| if types, err = batchRows.ColumnTypes(); err != nil { | ||
| batchRows.Close() |
There was a problem hiding this comment.
This is quite risky if we miss it, could we put that to defer?
| } | ||
| // Add greater-than condition for current column | ||
| paramIdx++ | ||
| condParts = append(condParts, fmt.Sprintf(`"%s" > :%d`, pk[i], paramIdx)) |
There was a problem hiding this comment.
Maybe would be easier with StringBuilder?
| // UPDATE: update "schema"."table" set "C1" = 'v1', "C2" = 'v2' where "C1" = 'old1' and "C2" = 'old2'; | ||
| // DELETE: delete from "schema"."table" where "C1" = 'v1' and "C2" = 'v2'; | ||
| type Parser struct { | ||
| valueConverter *OracleValueConverter |
There was a problem hiding this comment.
This package seems to be quite pointer heavy, maybe we could be using less pointers?
| return nil | ||
| } | ||
|
|
||
| dateStr := matches[1] |
| const ( | ||
| OciFieldLogMiner = "logminer" | ||
|
|
||
| OciFieldSCNWindowSize = "scn_window_size" | ||
| OciFieldBackoffInterval = "backoff_interval" | ||
| OciFieldMiningStrategy = "strategy" | ||
| OciFieldMaxTransactionEvents = "max_transaction_events" |
There was a problem hiding this comment.
Feels like service integration should be in the top level package.
| } | ||
|
|
||
| // StartSession starts a LogMiner session with ONLINE_CATALOG strategy | ||
| func (sm *SessionManager) StartSession(ctx context.Context, startSCN, endSCN uint64, committedDataOnly bool) error { |
There was a problem hiding this comment.
Do we want to add protection against calling it out of order?
|
|
||
| switch dml.Operation { | ||
| case sqlredo.OpInsert: | ||
| m.Operation = replication.MessageOperationInsert.String() |
There was a problem hiding this comment.
Could we have typed m.Operation?
| if err := lm.sessionMgr.EndSession(ctx); err != nil { | ||
| lm.log.Errorf("ending LogMiner session on exit: %v", err) | ||
| } | ||
| lm.sessionActive = false |
There was a problem hiding this comment.
Maybe move session state to SessionManager
oracle_cdc: someting
|
Claude finished @josephwoodward's task in 5m 23s —— View job
Commits
Review Well-structured CDC implementation with good architectural separation (LogMiner engine, session manager, transaction buffer, SQL redo parser). Component registration ( LGTM |
8a7713d to
6f5383b
Compare
|
Claude finished @josephwoodward's task in 4m 40s —— View job
Commits
Review Well-structured CDC implementation with good architectural separation (LogMiner engine, session manager, transaction buffer, SQL redo parser). Component registration ( LGTM |

This pull request adds support for the CDC pattern via LogMiner. LogMiner is traditionally a tool for providing a relational interface to Oracle's Redo Log Files for the purposes of tasks such as data recovery and auditing, it's also commonly also used for supporting the change data capture pattern. As such, there's some additional work for enabling CDC via LogMiner which I'll break down below.
How CDC Works with LogMiner
From a high level, LogMiner works by loading redo logs then allowing SQL based access to the change events. From an architectural perspective the following core components are:
LogMiner (
logminer/logminer.go)The continuous streaming engine. Each mining cycle:
Session Manager (
logminer/session.go)This is responsible for loading log files into LogMiner as they're cycled. It does this in a mining loop that continuously checks for the latest latest SCN.
Transaction Buffer (
logminer/cache.go)Event rows queried via LogMiner begin with a transaction start and eventually end with a commit/rollback. This means as we're reading events we have to buffer them until we reach the associative commit or rollback. Rollbacks result in the buffer being discarded and commits result in the buffer being processed by the SQL Redo Parser (discussed below) before published to Benthos pipeline. As such, it's possible that long-running transactions on a high-throughput system can lead to memory exhaustion, so it's important to have controls in place to limit this (
max_transaction_eventsallows users to specify an upper limit).As mentioned, the current implementation is in-memory but it'd be good to add support for other storage mechanisms (such as redis, or potentially even OracleDB - similar to what we do with the checkpoint cache)
SQL Redo Parser (logminer/sqlredo/)
Parses the reconstructed SQL redo statements produced by LogMiner back into structured DMLEvent values (table, operation type, column values).
The queries we get back from log miner literally look like this:
In some instances they'll have functions in them such as below:
Based on the function name we have to perform conversions in Go (see
logminer/sqlredo/valueconverter_test.gofor tests).Things to follow up in separate PR