Skip to content

bufferAllocator of LogRecordReadContext will leak if oom when decompress data. #2646

@loserwang1024

Description

@loserwang1024

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

0.8.0 (latest release)

Please describe the bug 🐞

If oom when decompress data:

2026-02-11 16:00:59,159 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) [flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:991) [?:?]
Caused by: org.apache.fluss.exception.FetchException: Received exception when fetching the next record from TableBucket{tableId=7, bucket=3}. If needed, please back to past the record to continue scanning.
	at org.apache.fluss.client.table.scanner.log.CompletedFetch.fetchRecords(CompletedFetch.java:187) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogFetchCollector.fetchRecords(LogFetchCollector.java:160) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogFetchCollector.collectFetch(LogFetchCollector.java:118) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogFetcher.collectFetch(LogFetcher.java:165) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogScannerImpl.pollForFetches(LogScannerImpl.java:234) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogScannerImpl.poll(LogScannerImpl.java:144) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.flink.source.reader.FlinkSourceSplitReader.fetch(FlinkSourceSplitReader.java:174) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
	... 6 more
Caused by: org.apache.fluss.shaded.arrow.org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:300) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:296) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:284) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:265) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:231) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.compression.ZstdArrowCompressionCodec.doDecompress(ZstdArrowCompressionCodec.java:88) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.AbstractCompressionCodec.decompress(AbstractCompressionCodec.java:77) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorLoader.loadBuffers(VectorLoader.java:106) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:84) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.utils.ArrowUtils.createArrowReader(ArrowUtils.java:181) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.record.DefaultLogRecordBatch.columnRecordIterator(DefaultLogRecordBatch.java:345) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.record.DefaultLogRecordBatch.records(DefaultLogRecordBatch.java:231) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.record.FileLogInputStream$FileChannelLogRecordBatch.records(FileLogInputStream.java:169) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.CompletedFetch.nextFetchedRecord(CompletedFetch.java:220) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.CompletedFetch.fetchRecords(CompletedFetch.java:170) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogFetchCollector.fetchRecords(LogFetchCollector.java:160) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogFetchCollector.collectFetch(LogFetchCollector.java:118) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogFetcher.collectFetch(LogFetcher.java:165) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogScannerImpl.pollForFetches(LogScannerImpl.java:234) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogScannerImpl.poll(LogScannerImpl.java:144) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.flink.source.reader.FlinkSourceSplitReader.fetch(FlinkSourceSplitReader.java:174) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
	... 6 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
	at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
	at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) ~[?:?]
	at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:710) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:685) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:212) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:194) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:136) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:126) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:178) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:211) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:300) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:296) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:284) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:265) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:231) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.compression.ZstdArrowCompressionCodec.doDecompress(ZstdArrowCompressionCodec.java:88) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.AbstractCompressionCodec.decompress(AbstractCompressionCodec.java:77) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorLoader.loadBuffers(VectorLoader.java:106) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:84) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.utils.ArrowUtils.createArrowReader(ArrowUtils.java:181) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.record.DefaultLogRecordBatch.columnRecordIterator(DefaultLogRecordBatch.java:345) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.record.DefaultLogRecordBatch.records(DefaultLogRecordBatch.java:231) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.record.FileLogInputStream$FileChannelLogRecordBatch.records(FileLogInputStream.java:169) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.CompletedFetch.nextFetchedRecord(CompletedFetch.java:220) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.CompletedFetch.fetchRecords(CompletedFetch.java:170) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogFetchCollector.fetchRecords(LogFetchCollector.java:160) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogFetchCollector.collectFetch(LogFetchCollector.java:118) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogFetcher.collectFetch(LogFetcher.java:165) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogScannerImpl.pollForFetches(LogScannerImpl.java:234) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.client.table.scanner.log.LogScannerImpl.poll(LogScannerImpl.java:144) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.flink.source.reader.FlinkSourceSplitReader.fetch(FlinkSourceSplitReader.java:174) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
	... 6 more

Then will leak memory:

2026-02-11 16:00:59,241 ERROR org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator [] - Memory was leaked by query. Memory leaked: (2112)
Allocator(ROOT) 0/2112/16777088/9223372036854775807 (res/actual/peak/limit)

2026-02-11 16:00:59,241 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Received uncaught exception.
java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: (2112)
Allocator(ROOT) 0/2112/16777088/9223372036854775807 (res/actual/peak/limit)

	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:405) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.close(RootAllocator.java:29) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]

Reason

Image

Theses temp ownBuffers cannot be released by VectorSchemaRoot because it has not been attach to vector.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions