From a54f3ea649d1d03cf7607649bd3aa5f2ec6b35ab Mon Sep 17 00:00:00 2001 From: Aleksandr Iushmanov Date: Thu, 25 Jun 2026 15:15:44 +0100 Subject: [PATCH] [FLIP-597][filesystem] Add common object storage stream abstractions Generated by: Claude Code --- .../fs/BufferingInputStreamExtension.java | 53 ++ .../flink/core/fs/InputStreamExtension.java | 78 ++ .../flink/core/fs/InputStreamOpener.java | 52 ++ .../core/fs/ObjectStorageInputStream.java | 436 ++++++++++ .../core/fs/ObjectStorageOutputStream.java | 249 ++++++ .../flink/core/fs/OutputStreamOpener.java | 50 ++ .../core/fs/RawAndWrappedInputStreams.java | 58 ++ .../org/apache/flink/core/fs/ReadContext.java | 55 ++ .../apache/flink/core/fs/WriteContext.java | 64 ++ .../core/fs/ObjectStorageInputStreamTest.java | 753 ++++++++++++++++++ .../fs/ObjectStorageOutputStreamTest.java | 543 +++++++++++++ 11 files changed, 2391 insertions(+) create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/BufferingInputStreamExtension.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/InputStreamExtension.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/InputStreamOpener.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/ObjectStorageInputStream.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/ObjectStorageOutputStream.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/OutputStreamOpener.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/RawAndWrappedInputStreams.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/ReadContext.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/WriteContext.java create mode 100644 flink-core/src/test/java/org/apache/flink/core/fs/ObjectStorageInputStreamTest.java create mode 100644 flink-core/src/test/java/org/apache/flink/core/fs/ObjectStorageOutputStreamTest.java diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/BufferingInputStreamExtension.java b/flink-core/src/main/java/org/apache/flink/core/fs/BufferingInputStreamExtension.java new file mode 100644 index 00000000000000..63a0979921d37f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/BufferingInputStreamExtension.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import javax.annotation.concurrent.Immutable; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Default {@link InputStreamExtension} that opens the raw stream via an {@link InputStreamOpener} + * and wraps it with a {@link BufferedInputStream}. + */ +@Internal +@Immutable +final class BufferingInputStreamExtension implements InputStreamExtension { + + private final InputStreamOpener opener; + private final int readBufferSize; + + BufferingInputStreamExtension(final InputStreamOpener opener, final int readBufferSize) { + this.opener = Preconditions.checkNotNull(opener, "opener"); + Preconditions.checkArgument(readBufferSize > 0, "readBufferSize must be positive"); + this.readBufferSize = readBufferSize; + } + + @Override + public RawAndWrappedInputStreams openStream(final InputStreamExtension.StreamContext ctx) + throws IOException { + final InputStream raw = opener.open(ReadContext.of(ctx.getPos())); + return new RawAndWrappedInputStreams(raw, new BufferedInputStream(raw, readBufferSize)); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/InputStreamExtension.java b/flink-core/src/main/java/org/apache/flink/core/fs/InputStreamExtension.java new file mode 100644 index 00000000000000..4d44a28f4a16dc --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/InputStreamExtension.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; + +import java.io.IOException; + +/** + * Extension point for {@link ObjectStorageInputStream}. + * + *

All methods are called with the stream's internal lock held. + * + * @see ObjectStorageInputStream + */ +@Internal +@Experimental +public interface InputStreamExtension { + + /** Read-only view of stream state passed to extension callbacks. */ + @Internal + @Experimental + interface StreamContext { + + /** Returns the current byte position. */ + long getPos(); + + /** Returns the total content length in bytes. */ + long getContentLength(); + } + + /** + * Opens streams at the current {@linkplain StreamContext#getPos() read position}. + * + *

Called during lazy initialization (first read) and stream recovery (seek beyond + * threshold). The previous streams are closed by the base class before this call. The base + * class manages the lifecycle of the returned streams (reads, closes, reopens on seek). + * + *

If the underlying source cannot do range reads (e.g., encrypted streams opened at offset + * 0), open at offset 0 and skip forward to {@link StreamContext#getPos()} after wrapping. + * + *

On failure, close any streams opened during this call before rethrowing. + * + * @param ctx read-only view of the stream state at the time of opening + * @return the raw and wrapped stream pair + * @throws IOException if opening fails + */ + RawAndWrappedInputStreams openStream(StreamContext ctx) throws IOException; + + /** + * Returns the default extension that opens the raw stream via {@code opener} and wraps it with + * a {@link java.io.BufferedInputStream}. + * + * @param opener opens a raw stream at a given byte position + * @param readBufferSize the buffer size for the {@link java.io.BufferedInputStream} + */ + static InputStreamExtension buffering( + final InputStreamOpener opener, final int readBufferSize) { + return new BufferingInputStreamExtension(opener, readBufferSize); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/InputStreamOpener.java b/flink-core/src/main/java/org/apache/flink/core/fs/InputStreamOpener.java new file mode 100644 index 00000000000000..d8705b3d04f448 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/InputStreamOpener.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Cloud-agnostic opener that returns an input stream starting at the position described by a {@link + * ReadContext}. + * + *

The implementation captures all cloud-specific state (client, path, bucket) in its closure, + * exposing only the read context to the caller. + * + * @see ReadContext + */ +@Internal +@Experimental +@NotThreadSafe +@FunctionalInterface +public interface InputStreamOpener { + + /** + * Opens an input stream starting at the position indicated by {@code ctx}. + * + * @param ctx context for this read, including the byte offset to start from + * @return an input stream positioned at {@code ctx.getPos()} + * @throws IOException if the stream cannot be opened + */ + InputStream open(ReadContext ctx) throws IOException; +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ObjectStorageInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ObjectStorageInputStream.java new file mode 100644 index 00000000000000..88e8999834b754 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/ObjectStorageInputStream.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Object storage input stream with configurable read-ahead buffer and range-based requests for seek + * operations. + * + *

Thread-safety: all public methods are guarded by a {@link ReentrantLock}. {@link #close()} is + * non-interruptible and safe to call from any thread (e.g., task cancellation). + * + *

Extension: use {@link InputStreamExtension#buffering(InputStreamOpener, int)} for the default + * behavior. Pass a custom {@link InputStreamExtension} to customize stream opening. + * + * @see InputStreamExtension + */ +@Internal +@Experimental +@ThreadSafe +public final class ObjectStorageInputStream extends FSDataInputStream { + + private final long contentLength; + private final long skipOnSeekThreshold; + + private final InputStreamExtension inputStreamExtension; + private final StreamContextImpl streamContext = new StreamContextImpl(); + + /** Lock guarding all mutable state. */ + private final ReentrantLock lock = new ReentrantLock(); + + @Nullable + @GuardedBy("lock") + private InputStream sdkStream; + + @Nullable + @GuardedBy("lock") + private InputStream wrappedStream; + + /** Current byte position. Written and read under {@code lock}. */ + @GuardedBy("lock") + private long position; + + @GuardedBy("lock") + private boolean closed; + + /** + * Creates a new object storage input stream. + * + *

Use {@link InputStreamExtension#buffering(InputStreamOpener, int)} for the default + * behavior (opens via the opener and wraps with {@link java.io.BufferedInputStream}). + * + * @param contentLength content length in bytes; must be ≥ 0 + * @param skipOnSeekThreshold maximum forward seek distance (bytes) handled by read-and-discard + * instead of close-and-reopen; must be non-negative (0 disables read-and-discard) + * @param inputStreamExtension extension for stream opening + */ + public ObjectStorageInputStream( + final long contentLength, + final long skipOnSeekThreshold, + final InputStreamExtension inputStreamExtension) { + Preconditions.checkArgument(contentLength >= 0, "contentLength must be non-negative"); + Preconditions.checkArgument( + skipOnSeekThreshold >= 0, "skipOnSeekThreshold must be non-negative"); + this.contentLength = contentLength; + this.skipOnSeekThreshold = skipOnSeekThreshold; + this.inputStreamExtension = Preconditions.checkNotNull(inputStreamExtension, "extension"); + this.position = 0; + } + + @GuardedBy("lock") + private void lazyInitialize() throws IOException { + checkLocked(); + if (sdkStream == null && !closed) { + openStream(); + } + } + + /** + * Closes the current streams and opens new ones at the current {@linkplain #getPos() position}. + * + *

If {@code openStream()} throws, the previous streams are already closed but no new streams + * are set — the next read will retry. + */ + @GuardedBy("lock") + private void openStream() throws IOException { + checkLocked(); + closeCurrentStream(); + final RawAndWrappedInputStreams pair = inputStreamExtension.openStream(streamContext); + this.sdkStream = pair.sdk(); + this.wrappedStream = pair.wrapped(); + } + + /** + * Closes the current underlying streams. If {@code wrappedStream} is present it is closed first + * (which also closes the wrapped {@code sdkStream}). If that close fails, a direct close of + * {@code sdkStream} is attempted so the remote connection is not leaked. + * + * @throws IOException if closing the stream(s) fails + */ + @GuardedBy("lock") + private void closeCurrentStream() throws IOException { + checkLocked(); + if (wrappedStream != null) { + try { + // Closing the wrapper typically closes the underlying stream as well. + wrappedStream.close(); + } catch (final IOException e) { + // wrappedStream.close() may have failed before closing the underlying stream. + // Try closing it directly so we don't leak the connection. + if (sdkStream != null) { + try { + sdkStream.close(); + } catch (final IOException suppressed) { + // Avoid self-suppression: the wrapper's close() delegates to + // sdkStream.close(), so both may throw the same exception instance. + if (suppressed != e) { + e.addSuppressed(suppressed); + } + } + } + throw e; + } finally { + wrappedStream = null; + sdkStream = null; + } + } else if (sdkStream != null) { + try { + sdkStream.close(); + } finally { + sdkStream = null; + } + } + } + + /** + * Size of the throwaway buffer used in read-and-discard seek. The exact size does not matter + * much: actual I/O granularity is governed by the SDK's internal chunk buffer, not this array. + */ + private static final int SKIP_BUFFER_SIZE = 8192; + + /** + * Seeks to the specified position in the stream. + * + *

For forward seeks within {@code skipThreshold}, bytes are read-and-discarded from the + * current stream. This uses {@code read()} rather than {@code skip()} because {@link + * java.io.BufferedInputStream#skip} can delegate to the underlying SDK stream's {@code skip()}, + * which discards its internal buffer even when the target falls within already-fetched data. + * Reading-and-discarding flows through the normal read path, consuming bytes from the SDK's + * existing buffer without additional HTTP requests. + * + *

For larger forward seeks or any backward seek, the stream is closed and reopened lazily + * with a range request at the new position — this costs one HTTP request regardless of + * distance. + * + * @param desired the position to seek to (byte offset from beginning) + * @throws IOException if the position is negative, beyond the file size, or the stream is + * closed + */ + @Override + public void seek(final long desired) throws IOException { + acquireLockInterruptibly(); + try { + checkNotClosed(); + if (desired < 0 || desired > contentLength) { + throw new IOException( + "Cannot seek to position " + + desired + + " (file size: " + + contentLength + + ")"); + } + seekInternal(desired); + } finally { + lock.unlock(); + } + } + + @GuardedBy("lock") + private void seekInternal(final long desired) throws IOException { + checkLocked(); + if (desired == position) { + return; + } + final long delta = desired - position; + if (delta > 0 && delta <= skipOnSeekThreshold && wrappedStream != null) { + readAndDiscard(delta); + } else { + position = desired; + closeCurrentStream(); + } + } + + /** Advances the stream by reading-and-discarding {@code delta} bytes. */ + @GuardedBy("lock") + private void readAndDiscard(final long delta) throws IOException { + if (delta == 0) { + return; + } + checkLocked(); + Preconditions.checkState( + wrappedStream != null, "readAndDiscard requires an open wrapped stream"); + final byte[] skipBuf = new byte[(int) Math.min(delta, SKIP_BUFFER_SIZE)]; + long remaining = delta; + while (remaining > 0L) { + final int toRead = (int) Math.min(remaining, skipBuf.length); + final int bytesRead = wrappedStream.read(skipBuf, 0, toRead); + if (bytesRead <= 0) { + break; + } + remaining -= bytesRead; + position += bytesRead; + } + if (remaining > 0L) { + throw new IOException( + "Unexpected end of stream during read-and-discard seek: " + + remaining + + " bytes remaining"); + } + } + + /** + * Returns the current position in the stream. + * + * @return the byte offset from the beginning of the stream + */ + @Override + public long getPos() { + lock.lock(); + try { + return position; + } finally { + lock.unlock(); + } + } + + /** + * Reads a single byte from the stream. + * + * @return the byte read as an integer (0-255), or -1 if end of stream is reached + * @throws IOException if the stream is closed or an I/O error occurs + */ + @Override + public int read() throws IOException { + acquireLockInterruptibly(); + try { + checkNotClosed(); + if (position >= contentLength) { + return -1; + } + lazyInitialize(); + final int data = wrappedStream.read(); + if (data != -1) { + position++; + } + return data; + } finally { + lock.unlock(); + } + } + + /** + * Reads bytes into a portion of a byte array. + * + * @param b the buffer into which the data is read + * @param off the start offset in array {@code b} at which the data is written + * @param len the maximum number of bytes to read + * @return the number of bytes read, or -1 if end of stream is reached + * @throws IOException if the stream is closed or an I/O error occurs + * @throws NullPointerException if {@code b} is null + * @throws IndexOutOfBoundsException if {@code off} or {@code len} is negative, or {@code len} + * is greater than {@code b.length - off} + */ + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + acquireLockInterruptibly(); + try { + checkNotClosed(); + Preconditions.checkNotNull(b, "buffer"); + Objects.checkFromIndexSize(off, len, b.length); + if (len == 0) { + return 0; + } + if (position >= contentLength) { + return -1; + } + lazyInitialize(); + final long remaining = contentLength - position; + final int toRead = (int) Math.min(len, remaining); + final int bytesRead = wrappedStream.read(b, off, toRead); + if (bytesRead > 0) { + position += bytesRead; + } + return bytesRead; + } finally { + lock.unlock(); + } + } + + /** + * Idempotently closes this stream and releases the underlying connection. + * + *

Uses non-interruptible {@code lock()} instead of {@code lockInterruptibly()} so that close + * always completes — stream cleanup must run to avoid resource leaks. + * + * @throws IOException if closing the underlying stream fails + */ + @Override + public void close() throws IOException { + lock.lock(); + try { + if (closed) { + return; + } + closed = true; + closeCurrentStream(); + } finally { + lock.unlock(); + } + } + + /** + * Returns an estimate of the number of bytes that can be read without blocking. + * + * @return the number of remaining bytes in the blob, capped at {@link Integer#MAX_VALUE} + * @throws IOException if the stream is closed + */ + @Override + public int available() throws IOException { + acquireLockInterruptibly(); + try { + checkNotClosed(); + final long remaining = Math.max(0L, contentLength - position); + return (int) Math.min(remaining, Integer.MAX_VALUE); + } finally { + lock.unlock(); + } + } + + /** + * Skips over and discards {@code n} bytes of data from this input stream. + * + * @param n the number of bytes to skip + * @return the actual number of bytes skipped (may be less if end of stream is reached) + * @throws IOException if the stream is closed or an I/O error occurs + */ + @Override + public long skip(final long n) throws IOException { + acquireLockInterruptibly(); + try { + checkNotClosed(); + if (n <= 0) { + return 0; + } + final long newPos = Math.min(position + n, contentLength); + final long skipped = newPos - position; + seekInternal(newPos); + return skipped; + } finally { + lock.unlock(); + } + } + + // ------------------------------------------------------------------------- + // Private helpers + // ------------------------------------------------------------------------- + + private final class StreamContextImpl implements InputStreamExtension.StreamContext { + + private StreamContextImpl() {} + + @Override + public long getPos() { + checkLocked(); + return position; + } + + @Override + public long getContentLength() { + return contentLength; + } + } + + /** Asserts that the current thread holds the lock. */ + private void checkLocked() { + Preconditions.checkState(lock.isHeldByCurrentThread()); + } + + private void checkNotClosed() throws IOException { + checkLocked(); + if (closed) { + throw new IOException("Stream is closed"); + } + } + + /** + * Acquires the lock, wrapping {@link InterruptedException} as {@link IOException} while + * restoring the interrupt flag. + */ + private void acquireLockInterruptibly() throws IOException { + try { + lock.lockInterruptibly(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting to acquire stream lock", e); + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ObjectStorageOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ObjectStorageOutputStream.java new file mode 100644 index 00000000000000..72f95d324415a8 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/ObjectStorageOutputStream.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Objects; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Object storage output stream that delegates writes to an underlying {@link OutputStream}. + * + *

The underlying stream may be a raw cloud stream or a wrapped stream (e.g., a compressing or + * encrypting stream). The delegate may buffer writes internally and upload blocks asynchronously. + * On {@link #close()} or {@link #sync()}, all buffered data is flushed and the stream is committed, + * making the file visible. + * + *

Thread-safety: all public methods are guarded by a {@link ReentrantLock}. {@link #close()} and + * {@link #sync()} use non-interruptible {@code lock()} so they always complete — stream cleanup + * must run to avoid resource leaks or silent data corruption. Other methods use {@code + * lockInterruptibly()} and throw {@link IOException} wrapping {@link InterruptedException} when + * interrupted. + * + * @see FSDataOutputStream + */ +@Internal +@Experimental +@ThreadSafe +public final class ObjectStorageOutputStream extends FSDataOutputStream { + + private final OutputStream delegate; + + /** Path to the file within the storage system. Used in diagnostic messages. */ + private final String filePath; + + /** Lock guarding all mutable state. */ + private final ReentrantLock lock = new ReentrantLock(); + + @GuardedBy("lock") + private long position; + + @GuardedBy("lock") + private boolean closed; + + /** + * Creates a new object storage output stream wrapping an already-opened delegate stream. + * + * @param delegate the underlying output stream (e.g., an SDK stream or a cipher-wrapped stream) + * @param filePath the path to the file within the storage system (used in diagnostic messages) + */ + public ObjectStorageOutputStream(final OutputStream delegate, final String filePath) { + this.delegate = Preconditions.checkNotNull(delegate, "delegate"); + this.filePath = Preconditions.checkNotNull(filePath, "filePath"); + this.position = 0; + } + + /** + * Returns the current position in the stream. + * + * @return the number of bytes written so far + * @throws IOException if interrupted while acquiring the lock, or if the stream is closed + */ + @Override + public long getPos() throws IOException { + acquireLockInterruptibly(); + try { + checkNotClosed(); + return position; + } finally { + lock.unlock(); + } + } + + /** + * Writes a single byte to the stream. + * + * @param b the byte to write (only the low 8 bits are used) + * @throws IOException if interrupted while acquiring the lock, if the stream is closed, or if + * an I/O error occurs + */ + @Override + public void write(final int b) throws IOException { + acquireLockInterruptibly(); + try { + checkNotClosed(); + delegate.write(b); + position++; + } finally { + lock.unlock(); + } + } + + /** + * Writes bytes from the specified byte array to this output stream. + * + * @param b the data to write + * @param off the start offset in the data + * @param len the number of bytes to write + * @throws IOException if interrupted while acquiring the lock, if the stream is closed, or if + * an I/O error occurs + * @throws NullPointerException if {@code b} is null + * @throws IndexOutOfBoundsException if {@code off} or {@code len} is out of bounds + */ + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + acquireLockInterruptibly(); + try { + checkNotClosed(); + Preconditions.checkNotNull(b, "buffer"); + Objects.checkFromIndexSize(off, len, b.length); + delegate.write(b, off, len); + position += len; + } finally { + lock.unlock(); + } + } + + /** + * Flushes this output stream and forces any buffered data to be uploaded. + * + * @throws IOException if interrupted while acquiring the lock, if the stream is closed, or if + * an I/O error occurs + */ + @Override + public void flush() throws IOException { + acquireLockInterruptibly(); + try { + checkNotClosed(); + delegate.flush(); + } finally { + lock.unlock(); + } + } + + /** + * Flushes all buffered data and commits the file, making it visible to readers. + * + *

This closes the underlying stream, triggering the commit. After this method returns + * successfully, the file is durable and visible. No further writes are possible — subsequent + * write or flush calls will throw {@link IOException}. + * + *

Calling {@code sync()} on an already-synced or closed stream is a no-op. + * + *

Uses non-interruptible {@code lock()} so that the commit always completes even if the + * calling thread is interrupted. + * + * @throws IOException if the commit fails + */ + @Override + public void sync() throws IOException { + commitAndClose(); + } + + /** + * Closes this output stream and attempts to commit the file. + * + *

On the happy path, callers should use {@link #sync()} before close to guarantee data + * visibility. If {@code sync()} was already called, this method is a no-op. + * + *

This method is safe to call from any thread (e.g., task cancellation or safety-net + * cleanup). Uses non-interruptible {@code lock()} so that stream cleanup always completes. + * + * @throws IOException if closing the stream fails + */ + @Override + public void close() throws IOException { + commitAndClose(); + } + + private void commitAndClose() throws IOException { + lock.lock(); + try { + if (closed) { + return; + } + closed = true; + flushAndCloseDelegate(); + } finally { + lock.unlock(); + } + } + + private void flushAndCloseDelegate() throws IOException { + Exception primaryException = null; + try { + delegate.flush(); + } catch (final Exception e) { + primaryException = e; + } + try { + delegate.close(); + } catch (final Exception closeEx) { + if (primaryException != null) { + primaryException.addSuppressed(closeEx); + } else { + primaryException = closeEx; + } + } + if (primaryException != null) { + ExceptionUtils.rethrowIOException(primaryException); + } + } + + @GuardedBy("lock") + private void checkNotClosed() throws IOException { + Preconditions.checkState(lock.isHeldByCurrentThread()); + if (closed) { + throw new IOException("Stream already closed: " + filePath); + } + } + + /** + * Acquires the lock, wrapping {@link InterruptedException} as {@link IOException} while + * restoring the interrupt flag. + */ + private void acquireLockInterruptibly() throws IOException { + try { + lock.lockInterruptibly(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + "Interrupted while waiting to acquire stream lock: " + filePath, e); + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/OutputStreamOpener.java b/flink-core/src/main/java/org/apache/flink/core/fs/OutputStreamOpener.java new file mode 100644 index 00000000000000..c15cf2e39ba3dc --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/OutputStreamOpener.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Cloud-agnostic opener that creates an output stream for a write operation. + * + *

The implementation captures all cloud-specific state (client, path, bucket) in its closure. + * The {@link WriteContext} provides metadata to attach to the cloud object before the stream is + * opened. + */ +@Internal +@Experimental +@NotThreadSafe +@FunctionalInterface +public interface OutputStreamOpener { + + /** + * Opens an output stream for the write operation described by {@code ctx}. + * + * @param ctx context for this write, including metadata to attach to the cloud object + * @return an output stream for writing the file content + * @throws IOException if the stream cannot be opened + */ + OutputStream open(WriteContext ctx) throws IOException; +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/RawAndWrappedInputStreams.java b/flink-core/src/main/java/org/apache/flink/core/fs/RawAndWrappedInputStreams.java new file mode 100644 index 00000000000000..bc0724c4a9f70c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/RawAndWrappedInputStreams.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.InputStream; + +/** + * A pair of streams returned by {@link + * InputStreamExtension#openStream(InputStreamExtension.StreamContext)}. + */ +@Internal +@Experimental +public final class RawAndWrappedInputStreams { + + private final InputStream sdk; + private final InputStream wrapped; + + /** + * Creates a new stream pair. + * + * @param sdk the raw SDK stream + * @param wrapped the wrapped stream used for reads + */ + public RawAndWrappedInputStreams(final InputStream sdk, final InputStream wrapped) { + this.sdk = Preconditions.checkNotNull(sdk, "sdk"); + this.wrapped = Preconditions.checkNotNull(wrapped, "wrapped"); + } + + /** Returns the raw SDK stream. */ + public InputStream sdk() { + return sdk; + } + + /** Returns the wrapped stream used for reads. */ + public InputStream wrapped() { + return wrapped; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ReadContext.java b/flink-core/src/main/java/org/apache/flink/core/fs/ReadContext.java new file mode 100644 index 00000000000000..3d38b950b83c32 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/ReadContext.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import javax.annotation.concurrent.Immutable; + +/** + * Context passed to {@link InputStreamOpener} describing the desired read position. + * + * @see InputStreamOpener + */ +@Internal +@Experimental +@Immutable +public interface ReadContext { + + /** + * Returns the byte offset at which the stream should start. + * + * @return byte offset; must be ≥ 0 + */ + long getPos(); + + /** + * Creates a {@link ReadContext} for the given byte position. + * + * @param pos byte offset; must be ≥ 0 + * @return a {@link ReadContext} that returns {@code pos} + * @throws IllegalArgumentException if {@code pos} is negative + */ + static ReadContext of(final long pos) { + Preconditions.checkArgument(pos >= 0, "pos must be >= 0"); + return () -> pos; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/WriteContext.java b/flink-core/src/main/java/org/apache/flink/core/fs/WriteContext.java new file mode 100644 index 00000000000000..262db64f71a719 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/WriteContext.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import javax.annotation.concurrent.Immutable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Context passed to {@link OutputStreamOpener} describing the write operation, including metadata + * to be persisted with the cloud object. + * + * @see OutputStreamOpener + */ +@Internal +@Experimental +@Immutable +public interface WriteContext { + + /** + * Returns the metadata to attach to the cloud object (e.g., encryption headers). + * + * @return key-value metadata map; never {@code null} + */ + Map getMetadata(); + + /** Shared empty context — avoids allocation for plain writes without metadata. */ + WriteContext EMPTY_WRITE_CONTEXT = Collections::emptyMap; + + /** + * Creates a {@link WriteContext} backed by a defensive copy of the given metadata map. + * + * @param metadata the metadata to expose; must not be {@code null} + * @return a {@link WriteContext} that returns an unmodifiable copy of {@code metadata} + * @throws NullPointerException if {@code metadata} is {@code null} + */ + static WriteContext of(final Map metadata) { + Preconditions.checkNotNull(metadata, "metadata"); + final Map copy = Collections.unmodifiableMap(new HashMap<>(metadata)); + return () -> copy; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/ObjectStorageInputStreamTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/ObjectStorageInputStreamTest.java new file mode 100644 index 00000000000000..1701a9b2fc9038 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/fs/ObjectStorageInputStreamTest.java @@ -0,0 +1,753 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link ObjectStorageInputStream}. */ +@Timeout(value = 10, unit = TimeUnit.MINUTES) +class ObjectStorageInputStreamTest { + + private static final int BUFFER_SIZE = 1024; + private static final long SKIP_THRESHOLD = 4 * 1024 * 1024; + + /** Poll interval for {@link #awaitBlocked}. */ + private static final long STATE_POLL_INTERVAL_MS = 100; + + private ObjectStorageInputStream stream; + + @AfterEach + void tearDown() throws Exception { + if (stream != null) { + stream.close(); + } + } + + /** + * Creates an opener that returns a {@link ByteArrayInputStream} starting at the requested + * position. + */ + private static InputStreamOpener testOpener(final byte[] data) { + return ctx -> { + final int pos = (int) Math.min(ctx.getPos(), data.length); + return new ByteArrayInputStream(data, pos, data.length - pos); + }; + } + + private static InputStreamExtension buffering(final byte[] data) { + return InputStreamExtension.buffering(testOpener(data), BUFFER_SIZE); + } + + private static InputStreamExtension buffering(final InputStreamOpener opener) { + return InputStreamExtension.buffering(opener, BUFFER_SIZE); + } + + private ObjectStorageInputStream createStream(final byte[] data) { + stream = new ObjectStorageInputStream(data.length, SKIP_THRESHOLD, buffering(data)); + return stream; + } + + /** InputStream that tracks how many times {@link #close()} is called. */ + private static final class TrackingInputStream extends ByteArrayInputStream { + private int closeCount; + + TrackingInputStream(final byte[] data) { + super(data); + } + + @Override + public void close() throws IOException { + closeCount++; + super.close(); + } + } + + /** Opener that tracks how many times it has been called. */ + private static final class TrackingOpener implements InputStreamOpener { + private final byte[] data; + int openCount; + + TrackingOpener(final byte[] data) { + this.data = data; + } + + @Override + public InputStream open(final ReadContext ctx) { + openCount++; + final int pos = (int) Math.min(ctx.getPos(), data.length); + return new ByteArrayInputStream(data, pos, data.length - pos); + } + } + + /** Extension for tests that opens streams via the provided opener. */ + private static final class TestingExtension implements InputStreamExtension { + private final InputStreamOpener opener; + + TestingExtension(final InputStreamOpener opener) { + this.opener = opener; + } + + @Override + public RawAndWrappedInputStreams openStream(final StreamContext ctx) throws IOException { + final InputStream raw = opener.open(ReadContext.of(ctx.getPos())); + return new RawAndWrappedInputStreams(raw, new BufferedInputStream(raw, BUFFER_SIZE)); + } + } + + // --- Constructor validation --- + + @Test + void shouldThrowOnNullExtension() { + assertThatThrownBy(() -> new ObjectStorageInputStream(100L, SKIP_THRESHOLD, null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("extension"); + } + + @Test + void shouldThrowOnNegativeContentLength() { + assertThatThrownBy( + () -> + new ObjectStorageInputStream( + -1L, SKIP_THRESHOLD, buffering(new byte[0]))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("non-negative"); + } + + @Test + void shouldThrowOnNegativeSkipThreshold() { + assertThatThrownBy(() -> new ObjectStorageInputStream(100L, -1L, buffering(new byte[0]))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("non-negative"); + } + + @Test + void shouldAcceptZeroSkipThresholdForEmptyFile() throws Exception { + stream = new ObjectStorageInputStream(0L, 0L, buffering(new byte[0])); + assertThat(stream.available()).isEqualTo(0); + assertThat(stream.read()).isEqualTo(-1); + } + + // --- Lazy initialization --- + + @Test + void shouldNotOpenStreamOnConstruction() { + final TrackingOpener opener = new TrackingOpener(new byte[0]); + stream = new ObjectStorageInputStream(100L, SKIP_THRESHOLD, buffering(opener)); + assertThat(opener.openCount).isEqualTo(0); + } + + @Test + void shouldOpenStreamOnFirstRead() throws Exception { + final byte[] data = {42}; + final TrackingOpener opener = new TrackingOpener(data); + + stream = new ObjectStorageInputStream(data.length, SKIP_THRESHOLD, buffering(opener)); + assertThat(opener.openCount).isEqualTo(0); + stream.read(); + assertThat(opener.openCount).isEqualTo(1); + } + + // --- Single byte read --- + + @Test + void shouldReadSingleByte() throws Exception { + createStream(new byte[] {42}); + assertThat(stream.read()).isEqualTo(42); + } + + @ParameterizedTest + @MethodSource("eofScenarios") + void shouldReturnMinusOneAtEof(final byte[] data) throws Exception { + createStream(data); + for (int i = 0; i < data.length; i++) { + stream.read(); + } + assertThat(stream.read()).isEqualTo(-1); + } + + private static Stream eofScenarios() { + return Stream.of(Arguments.of((Object) new byte[] {1}), Arguments.of((Object) new byte[0])); + } + + // --- Byte array read --- + + @Test + void shouldReadByteArray() throws Exception { + final byte[] data = {1, 2, 3, 4, 5}; + createStream(data); + final byte[] buf = new byte[5]; + final int read = stream.read(buf, 0, 5); + assertThat(read).isEqualTo(5); + assertThat(buf).isEqualTo(data); + } + + @Test + void shouldReturnMinusOneForByteArrayReadAtEof() throws Exception { + final byte[] data = {1}; + createStream(data); + final byte[] buf = new byte[1]; + stream.read(buf, 0, 1); + assertThat(stream.read(buf, 0, 1)).isEqualTo(-1); + } + + @Test + void shouldReturnZeroForZeroLengthRead() throws Exception { + createStream(new byte[] {1, 2, 3}); + final byte[] buf = new byte[3]; + assertThat(stream.read(buf, 0, 0)).isEqualTo(0); + } + + @Test + void shouldThrowOnNullArrayRead() throws Exception { + createStream(new byte[0]); + assertThatThrownBy(() -> stream.read(null, 0, 1)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("buffer"); + } + + @ParameterizedTest + @MethodSource("boundsViolations") + void shouldThrowOnBoundsViolation(final int bufLen, final int off, final int len) + throws Exception { + createStream(new byte[0]); + final byte[] buf = new byte[bufLen]; + assertThatThrownBy(() -> stream.read(buf, off, len)) + .isInstanceOf(IndexOutOfBoundsException.class); + } + + private static Stream boundsViolations() { + return Stream.of( + Arguments.of(5, -1, 1), + Arguments.of(5, 0, -1), + Arguments.of(5, 3, 5), + Arguments.of(5, 6, 1)); + } + + // --- Position tracking --- + + @Test + void shouldTrackPositionAfterSingleByteReads() throws Exception { + createStream(new byte[] {1, 2, 3}); + assertThat(stream.getPos()).isEqualTo(0); + stream.read(); + assertThat(stream.getPos()).isEqualTo(1); + stream.read(); + assertThat(stream.getPos()).isEqualTo(2); + } + + @Test + void shouldTrackPositionAfterArrayRead() throws Exception { + createStream(new byte[] {1, 2, 3, 4, 5}); + final byte[] buf = new byte[3]; + stream.read(buf, 0, 3); + assertThat(stream.getPos()).isEqualTo(3); + } + + // --- Seek --- + + @Test + void shouldSeekForward() throws Exception { + final byte[] data = {1, 2, 3, 4, 5}; + createStream(data); + stream.read(); // initialize + stream.seek(3); + assertThat(stream.getPos()).isEqualTo(3); + assertThat(stream.read()).isEqualTo(4); + } + + @Test + void shouldSeekBackward() throws Exception { + final byte[] data = {1, 2, 3, 4, 5}; + createStream(data); + stream.read(); + stream.read(); + stream.read(); + assertThat(stream.getPos()).isEqualTo(3); + + stream.seek(1); + assertThat(stream.getPos()).isEqualTo(1); + assertThat(stream.read()).isEqualTo(2); + } + + @Test + void shouldNotReopenStreamOnSeekToSamePosition() throws Exception { + final byte[] data = {1, 2, 3}; + final TrackingOpener opener = new TrackingOpener(data); + + stream = new ObjectStorageInputStream(data.length, SKIP_THRESHOLD, buffering(opener)); + stream.read(); // initialize, position=1 + stream.seek(1); // same position — should not reopen + assertThat(opener.openCount).isEqualTo(1); + } + + @Test + void shouldThrowOnNegativeSeek() throws Exception { + createStream(new byte[0]); + assertThatThrownBy(() -> stream.seek(-1)).isInstanceOf(IOException.class); + } + + @Test + void shouldThrowOnSeekAfterClose() throws Exception { + createStream(new byte[0]); + stream.close(); + assertThatThrownBy(() -> stream.seek(0)).isInstanceOf(IOException.class); + } + + // --- Skip --- + + @Test + void shouldSkipForward() throws Exception { + createStream(new byte[] {1, 2, 3, 4, 5}); + final long skipped = stream.skip(3); + assertThat(skipped).isEqualTo(3); + assertThat(stream.getPos()).isEqualTo(3); + } + + @Test + void shouldCapSkipAtContentLength() throws Exception { + createStream(new byte[] {1, 2, 3}); + final long skipped = stream.skip(100); + assertThat(skipped).isEqualTo(3); + assertThat(stream.getPos()).isEqualTo(3); + } + + @Test + void shouldReturnZeroForNonPositiveSkip() throws Exception { + stream = new ObjectStorageInputStream(100L, SKIP_THRESHOLD, buffering(new byte[0])); + assertThat(stream.skip(0)).isEqualTo(0); + assertThat(stream.skip(-5)).isEqualTo(0); + } + + @Test + void shouldSkipAfterSeek() throws Exception { + final byte[] data = {1, 2, 3, 4, 5}; + createStream(data); + stream.seek(2); + final long skipped = stream.skip(2); + assertThat(skipped).isEqualTo(2); + assertThat(stream.getPos()).isEqualTo(4); + assertThat(stream.read()).isEqualTo(5); + } + + // --- Seek optimization --- + + @Test + void shouldUseBufferForSmallForwardSeek() throws Exception { + final byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + final TrackingOpener opener = new TrackingOpener(data); + + stream = new ObjectStorageInputStream(data.length, SKIP_THRESHOLD, buffering(opener)); + stream.read(); // trigger initialization, openCount=1 + assertThat(opener.openCount).isEqualTo(1); + + stream.seek(3); // small forward delta (2 bytes), should NOT reopen + assertThat(opener.openCount).isEqualTo(1); + assertThat(stream.getPos()).isEqualTo(3); + assertThat(stream.read()).isEqualTo(4); + } + + @Test + void shouldReopenForLargeForwardSeek() throws Exception { + final int fileSize = (int) (SKIP_THRESHOLD + 1024); + final byte[] data = new byte[fileSize]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) (i & 0xFF); + } + final TrackingOpener opener = new TrackingOpener(data); + + stream = new ObjectStorageInputStream(data.length, SKIP_THRESHOLD, buffering(opener)); + stream.read(); // trigger initialization, openCount=1 + assertThat(opener.openCount).isEqualTo(1); + + // delta > SKIP_THRESHOLD, closes stream + stream.seek(SKIP_THRESHOLD + 512); + assertThat(stream.getPos()).isEqualTo(SKIP_THRESHOLD + 512); + stream.read(); // lazy reopen on read + assertThat(opener.openCount).isEqualTo(2); + } + + @Test + void shouldReopenForBackwardSeek() throws Exception { + final byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + final TrackingOpener opener = new TrackingOpener(data); + + stream = new ObjectStorageInputStream(data.length, SKIP_THRESHOLD, buffering(opener)); + stream.read(); + stream.read(); + stream.read(); // position=3, openCount=1 + + stream.seek(1); // backward seek, closes stream + assertThat(stream.read()).isEqualTo(2); // lazy reopen on read + assertThat(opener.openCount).isEqualTo(2); + } + + @Test + void shouldThrowWhenReadAndDiscardEncountersTruncatedStream() throws Exception { + final byte[] fullData = {1, 2, 3, 4, 5, 6, 7, 8}; + final int bufferSize = 8; + // Opener returns only 2 bytes regardless of position + final InputStreamOpener opener = + ctx -> + new ByteArrayInputStream( + fullData, (int) ctx.getPos(), 2 - (int) ctx.getPos()); + + stream = + new ObjectStorageInputStream( + fullData.length, + SKIP_THRESHOLD, + InputStreamExtension.buffering(opener, bufferSize)); + stream.read(); // position=1, buffer has 1 byte from truncated stream + + // Seek forward within threshold triggers read-and-discard, which hits EOF prematurely + assertThatThrownBy(() -> stream.seek(3)) + .isInstanceOf(IOException.class) + .hasMessageContaining("Unexpected end of stream during read-and-discard seek"); + } + + // --- Close --- + + @Test + void shouldCloseUnderlyingStreams() throws Exception { + final byte[] data = {1, 2, 3}; + final TrackingInputStream tracking = new TrackingInputStream(data); + final InputStreamOpener opener = ctx -> tracking; + + stream = new ObjectStorageInputStream(data.length, SKIP_THRESHOLD, buffering(opener)); + stream.read(); // trigger initialization + stream.close(); + // BufferedInputStream.close() closes the underlying stream once. + assertThat(tracking.closeCount).isEqualTo(1); + } + + @Test + void shouldCloseSuccessfullyWhenThreadIsInterrupted() throws Exception { + createStream(new byte[] {1, 2, 3}); + stream.read(); + + Thread.currentThread().interrupt(); + stream.close(); + + assertThat(Thread.interrupted()).isTrue(); + assertThatThrownBy(stream::read).isInstanceOf(IOException.class); + } + + @Test + void shouldHandleDoubleCloseGracefully() throws Exception { + stream = new ObjectStorageInputStream(100L, SKIP_THRESHOLD, buffering(new byte[0])); + stream.close(); + stream.close(); // should not throw + } + + @ParameterizedTest + @MethodSource("closedStreamOperations") + void shouldThrowOnOperationAfterClose(final StreamOperation operation) throws Exception { + stream = new ObjectStorageInputStream(100L, SKIP_THRESHOLD, buffering(new byte[0])); + stream.close(); + assertThatThrownBy(() -> operation.execute(stream)).isInstanceOf(IOException.class); + } + + @FunctionalInterface + interface StreamOperation { + void execute(ObjectStorageInputStream stream) throws Exception; + } + + private static Stream closedStreamOperations() { + return Stream.of( + Arguments.of((StreamOperation) ObjectStorageInputStream::read), + Arguments.of((StreamOperation) s -> s.read(new byte[1], 0, 1)), + Arguments.of((StreamOperation) s -> s.seek(0)), + Arguments.of((StreamOperation) s -> s.skip(1)), + Arguments.of((StreamOperation) ObjectStorageInputStream::available)); + } + + // --- Concurrency --- + + @Test + void shouldInterruptLockAcquisitionAndThrowIOException() throws Exception { + final CountDownLatch openerBlocked = new CountDownLatch(1); + final CountDownLatch openerRelease = new CountDownLatch(1); + final InputStreamOpener blockingOpener = + blockingOpener(new byte[] {1, 2, 3}, openerBlocked, openerRelease); + stream = new ObjectStorageInputStream(3L, SKIP_THRESHOLD, buffering(blockingOpener)); + + // First thread: holds the lock while blocked inside the opener. + final CompletableFuture holder = + CompletableFuture.runAsync( + () -> { + try { + stream.read(); + } catch (final IOException ignored) { + } + }); + openerBlocked.await(); + + // Second thread: blocks on lock acquisition — we need the raw Thread for interrupt(). + final AtomicReference readerError = new AtomicReference<>(); + final Thread readerThread = + new Thread( + () -> { + try { + stream.read(); + } catch (final IOException e) { + readerError.set(e); + } + }); + readerThread.start(); + awaitBlocked(readerThread); + + readerThread.interrupt(); + readerThread.join(); + + openerRelease.countDown(); + holder.join(); + + assertThat(readerThread.isAlive()).isFalse(); + assertThat(readerError.get()) + .isInstanceOf(IOException.class) + .hasMessageContaining("Interrupted"); + } + + /** + * Creates an opener that blocks on {@code blocked}/{@code release} latches on first call, + * allowing tests to hold the stream's lock from a background thread. + */ + private static InputStreamOpener blockingOpener( + final byte[] data, final CountDownLatch blocked, final CountDownLatch release) { + return ctx -> { + blocked.countDown(); + try { + release.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted in blocking opener", e); + } + final int pos = (int) Math.min(ctx.getPos(), data.length); + return new ByteArrayInputStream(data, pos, data.length - pos); + }; + } + + private static void awaitBlocked(final Thread thread) throws InterruptedException { + while (thread.getState() != Thread.State.WAITING) { + Thread.sleep(STATE_POLL_INTERVAL_MS); + } + } + + // --- Available --- + + @Test + void shouldReturnRemainingBytesFromAvailable() throws Exception { + final byte[] data = {1, 2, 3, 4, 5}; + createStream(data); + assertThat(stream.available()).isEqualTo(5); + stream.read(); + assertThat(stream.available()).isEqualTo(4); + } + + @Test + void shouldCapAvailableAtIntegerMaxValue() throws Exception { + stream = + new ObjectStorageInputStream( + Long.MAX_VALUE, SKIP_THRESHOLD, buffering(new byte[0])); + assertThat(stream.available()).isEqualTo(Integer.MAX_VALUE); + } + + @Test + void shouldReturnZeroAvailableAfterSeekToEof() throws Exception { + final byte[] data = {1, 2, 3}; + createStream(data); + stream.seek(data.length); + assertThat(stream.available()).isEqualTo(0); + } + + @Test + void shouldThrowOnSeekBeyondContentLength() throws Exception { + final byte[] data = {1, 2, 3}; + createStream(data); + assertThatThrownBy(() -> stream.seek(data.length + 1)) + .isInstanceOf(IOException.class) + .hasMessageContaining("file size"); + } + + @Test + void shouldThrowOnAvailableAfterClose() throws Exception { + stream = new ObjectStorageInputStream(100L, SKIP_THRESHOLD, buffering(new byte[0])); + stream.close(); + assertThatThrownBy(stream::available).isInstanceOf(IOException.class); + } + + // --- Opener failure --- + + @Test + void shouldPropagateOpenerIOException() { + final InputStreamOpener failingOpener = + ctx -> { + throw new IOException("connection refused"); + }; + stream = new ObjectStorageInputStream(100L, SKIP_THRESHOLD, buffering(failingOpener)); + assertThatThrownBy(stream::read).isInstanceOf(IOException.class); + } + + @Test + void shouldPropagateOpenerRuntimeException() { + final RuntimeException expectedException = new RuntimeException("unexpected"); + final InputStreamOpener failingOpener = + ctx -> { + throw expectedException; + }; + stream = new ObjectStorageInputStream(100L, SKIP_THRESHOLD, buffering(failingOpener)); + assertThatThrownBy(stream::read) + .isInstanceOf(RuntimeException.class) + .isSameAs(expectedException); + } + + // --- Edge cases --- + + @Test + void shouldHandleZeroContentLength() throws Exception { + createStream(new byte[0]); + assertThat(stream.available()).isEqualTo(0); + assertThat(stream.read()).isEqualTo(-1); + final byte[] buf = new byte[1]; + assertThat(stream.read(buf, 0, 1)).isEqualTo(-1); + } + + @Test + void shouldNotOpenStreamWhenAlreadyAtEof() throws Exception { + final TrackingOpener opener = new TrackingOpener(new byte[0]); + stream = new ObjectStorageInputStream(0L, SKIP_THRESHOLD, buffering(opener)); + stream.read(); + assertThat(opener.openCount).isEqualTo(0); + } + + // --- Truncated stream (contentLength exceeds actual data) --- + + @Test + void shouldReturnMinusOneFromSingleByteReadWhenStreamTruncated() throws Exception { + final byte[] actualData = {1, 2}; + final long reportedLength = 10; + stream = + new ObjectStorageInputStream(reportedLength, SKIP_THRESHOLD, buffering(actualData)); + assertThat(stream.read()).isEqualTo(1); + assertThat(stream.read()).isEqualTo(2); + assertThat(stream.read()).isEqualTo(-1); + assertThat(stream.getPos()).isEqualTo(2); + } + + @Test + void shouldReturnMinusOneFromArrayReadWhenStreamTruncated() throws Exception { + final byte[] actualData = {1, 2}; + final long reportedLength = 10; + stream = + new ObjectStorageInputStream(reportedLength, SKIP_THRESHOLD, buffering(actualData)); + final byte[] buf = new byte[5]; + final int firstRead = stream.read(buf, 0, 5); + assertThat(firstRead).isEqualTo(2); + assertThat(stream.read(buf, 0, 5)).isEqualTo(-1); + assertThat(stream.getPos()).isEqualTo(2); + } + + // --- closeCurrentStream error handling --- + + @Test + void shouldPropagateCloseExceptionFromUnderlyingStream() throws Exception { + final IOException closeFailure = new IOException("close failed"); + final InputStreamOpener opener = + ctx -> + new ByteArrayInputStream(new byte[] {1, 2, 3}) { + @Override + public void close() throws IOException { + throw closeFailure; + } + }; + + stream = new ObjectStorageInputStream(3L, SKIP_THRESHOLD, buffering(opener)); + stream.read(); // trigger lazy init + + assertThatThrownBy(stream::close).isInstanceOf(IOException.class).isSameAs(closeFailure); + } + + // --- InputStreamExtension: openStream --- + + @Test + void shouldCallExtensionOpenStreamOnInit() throws Exception { + final byte[] data = {1, 2, 3}; + final TrackingOpener opener = new TrackingOpener(data); + final InputStreamExtension extension = new TestingExtension(opener); + + stream = new ObjectStorageInputStream(data.length, SKIP_THRESHOLD, extension); + assertThat(opener.openCount).isEqualTo(0); + final int ignore = stream.read(); + assertThat(opener.openCount).isEqualTo(1); + } + + @Test + void shouldCallExtensionOpenStreamOnReopen() throws Exception { + final byte[] data = {1, 2, 3, 4, 5}; + final TrackingOpener opener = new TrackingOpener(data); + final InputStreamExtension extension = new TestingExtension(opener); + + stream = new ObjectStorageInputStream(data.length, SKIP_THRESHOLD, extension); + stream.read(); // triggers init, openCount=1 + stream.seek(0); // backward seek forces reopen on next read + stream.read(); // triggers reopen, openCount=2 + assertThat(opener.openCount).isEqualTo(2); + } + + @Test + void shouldUseDefaultBufferingExtension() throws Exception { + final byte[] data = {1, 2, 3, 4, 5}; + stream = new ObjectStorageInputStream(data.length, SKIP_THRESHOLD, buffering(data)); + final byte[] buf = new byte[5]; + final int read = stream.read(buf, 0, 5); + assertThat(read).isEqualTo(5); + assertThat(buf).isEqualTo(data); + assertThat(stream.getPos()).isEqualTo(5); + } + + @Test + void shouldPropagateExtensionOpenStreamException() { + final IOException openFailure = new IOException("open failed"); + final InputStreamExtension extension = + new TestingExtension( + ctx -> { + throw openFailure; + }); + + stream = new ObjectStorageInputStream(1L, SKIP_THRESHOLD, extension); + assertThatThrownBy(stream::read).isSameAs(openFailure); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/ObjectStorageOutputStreamTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/ObjectStorageOutputStreamTest.java new file mode 100644 index 00000000000000..538394501de2ea --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/fs/ObjectStorageOutputStreamTest.java @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link ObjectStorageOutputStream}. */ +@Timeout(value = 10, unit = TimeUnit.MINUTES) +class ObjectStorageOutputStreamTest { + + private static final String FILE_PATH = "test/output.txt"; + + private ObjectStorageOutputStream stream; + + @AfterEach + void tearDown() throws Exception { + if (stream != null) { + try { + stream.close(); + } catch (final Exception ignored) { + // stream may already be closed or in a failed state + } + } + } + + /** Test double that records writes and captures the written content. */ + private static final class RecordingOutputStream extends OutputStream { + private final ByteArrayOutputStream delegate = new ByteArrayOutputStream(); + private final List operations = new ArrayList<>(); + private boolean flushed; + private boolean closed; + + @Override + public void write(final int b) throws IOException { + delegate.write(b); + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + delegate.write(b, off, len); + } + + @Override + public void flush() throws IOException { + flushed = true; + operations.add("flush"); + delegate.flush(); + } + + @Override + public void close() throws IOException { + closed = true; + operations.add("close"); + delegate.close(); + } + + byte[] getWrittenData() { + return delegate.toByteArray(); + } + } + + private ObjectStorageOutputStream createStream() { + return createStream(new ByteArrayOutputStream()); + } + + private ObjectStorageOutputStream createStream(final OutputStream outputStream) { + stream = new ObjectStorageOutputStream(outputStream, FILE_PATH); + return stream; + } + + // --- Constructor validation --- + + @Test + void shouldThrowOnNullDelegate() { + assertThatThrownBy(() -> new ObjectStorageOutputStream(null, FILE_PATH)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("delegate"); + } + + @Test + void shouldThrowOnNullFilePath() { + assertThatThrownBy(() -> new ObjectStorageOutputStream(new ByteArrayOutputStream(), null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("filePath"); + } + + // --- Write operations --- + + @Test + void shouldWriteSingleByte() throws Exception { + createStream(); + stream.write(42); + assertThat(stream.getPos()).isEqualTo(1); + } + + @Test + void shouldWriteByteArray() throws Exception { + final byte[] data = {1, 2, 3, 4, 5}; + createStream(); + stream.write(data, 0, data.length); + assertThat(stream.getPos()).isEqualTo(5); + } + + @Test + void shouldWriteByteArrayWithOffset() throws Exception { + final byte[] data = {1, 2, 3, 4, 5}; + createStream(); + stream.write(data, 2, 3); + assertThat(stream.getPos()).isEqualTo(3); + } + + @Test + void shouldTrackPositionAcrossMultipleWrites() throws Exception { + createStream(); + stream.write(1); + stream.write(new byte[] {2, 3, 4}, 0, 3); + assertThat(stream.getPos()).isEqualTo(4); + } + + @Test + void shouldDelegateSingleArgWriteToDelegateStream() throws Exception { + // tests the inherited write(byte[]) -> write(byte[], 0, len) delegation + createStream(); + final byte[] data = {1, 2, 3}; + stream.write(data); + assertThat(stream.getPos()).isEqualTo(3); + } + + @Test + void shouldHandleZeroLengthWrite() throws Exception { + createStream(); + stream.write(new byte[5], 0, 0); + assertThat(stream.getPos()).isEqualTo(0); + } + + @Test + void shouldThrowOnNullArrayWrite() throws Exception { + createStream(); + assertThatThrownBy(() -> stream.write(null, 0, 1)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("buffer"); + } + + @Test + void shouldThrowOnBoundsViolation() throws Exception { + createStream(); + final byte[] buf = new byte[5]; + assertThatThrownBy(() -> stream.write(buf, 3, 5)) + .isInstanceOf(IndexOutOfBoundsException.class); + } + + @ParameterizedTest + @MethodSource("closedWriteOperations") + void shouldThrowOnWriteAfterClose(final WriteOperation operation) throws Exception { + createStream(); + stream.close(); + assertThatThrownBy(() -> operation.execute(stream)).isInstanceOf(IOException.class); + } + + @FunctionalInterface + interface WriteOperation { + void execute(ObjectStorageOutputStream stream) throws Exception; + } + + private static Stream closedWriteOperations() { + return Stream.of( + Arguments.of((WriteOperation) s -> s.write(42)), + Arguments.of((WriteOperation) s -> s.write(new byte[] {1}, 0, 1)), + Arguments.of((WriteOperation) ObjectStorageOutputStream::flush), + Arguments.of((WriteOperation) ObjectStorageOutputStream::getPos)); + } + + // --- Write content verification --- + + @Test + void shouldDelegateWritesToDelegateStream() throws Exception { + final RecordingOutputStream recordingOut = new RecordingOutputStream(); + createStream(recordingOut); + + final byte[] data = {1, 2, 3}; + stream.write(data, 0, data.length); + stream.close(); + + assertThat(recordingOut.getWrittenData()).isEqualTo(data); + } + + @Test + void shouldDelegateSingleByteWritesToDelegateStream() throws Exception { + final RecordingOutputStream recordingOut = new RecordingOutputStream(); + createStream(recordingOut); + + stream.write(42); + stream.write(99); + stream.close(); + + assertThat(recordingOut.getWrittenData()).isEqualTo(new byte[] {42, 99}); + } + + // --- Flush and sync --- + + @Test + void shouldFlushWithoutError() throws Exception { + createStream(); + stream.write(1); + stream.flush(); + } + + @Test + void shouldFlushDelegateToDelegateStream() throws Exception { + final RecordingOutputStream recordingOut = new RecordingOutputStream(); + createStream(recordingOut); + + stream.write(1); + stream.flush(); + + assertThat(recordingOut.flushed).isTrue(); + } + + @Test + void shouldSyncFlushAndCloseDelegateStream() throws Exception { + final RecordingOutputStream recordingOut = new RecordingOutputStream(); + createStream(recordingOut); + + stream.write(1); + stream.sync(); + + assertThat(recordingOut.flushed).isTrue(); + assertThat(recordingOut.closed).isTrue(); + assertThat(recordingOut.operations).containsExactly("flush", "close"); + } + + @Test + void shouldSyncBeIdempotent() throws Exception { + final RecordingOutputStream recordingOut = new RecordingOutputStream(); + createStream(recordingOut); + + stream.write(1); + stream.sync(); + stream.sync(); // second call is a no-op + + assertThat(recordingOut.operations).containsExactly("flush", "close"); + } + + @Test + void shouldThrowOnWriteAfterSync() throws Exception { + createStream(); + stream.write(1); + stream.sync(); + + assertThatThrownBy(() -> stream.write(42)).isInstanceOf(IOException.class); + assertThatThrownBy(() -> stream.flush()).isInstanceOf(IOException.class); + assertThatThrownBy(() -> stream.getPos()).isInstanceOf(IOException.class); + } + + @Test + void shouldCloseBeNoOpAfterSync() throws Exception { + final RecordingOutputStream recordingOut = new RecordingOutputStream(); + createStream(recordingOut); + + stream.write(1); + stream.sync(); + stream.close(); // should be no-op — already closed by sync + + assertThat(recordingOut.operations).containsExactly("flush", "close"); + } + + // --- Close --- + + @Test + void shouldCloseDelegateStreamOnClose() throws Exception { + final RecordingOutputStream recordingOut = new RecordingOutputStream(); + createStream(recordingOut); + + stream.write(1); + stream.close(); + + assertThat(recordingOut.closed).isTrue(); + } + + @Test + void shouldFlushBeforeClosingDelegateStream() throws Exception { + final RecordingOutputStream recordingOut = new RecordingOutputStream(); + createStream(recordingOut); + + stream.write(1); + stream.close(); + + assertThat(recordingOut.operations).containsExactly("flush", "close"); + } + + @Test + void shouldAllowCloseFromDifferentThread() throws Exception { + final RecordingOutputStream recordingOut = new RecordingOutputStream(); + createStream(recordingOut); + stream.write(1); + + final Thread closer = + new Thread( + () -> { + try { + stream.close(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + }); + closer.start(); + closer.join(); + assertThat(recordingOut.closed).isTrue(); + } + + @Test + void shouldHandleDoubleCloseGracefully() throws Exception { + final RecordingOutputStream recordingOut = new RecordingOutputStream(); + createStream(recordingOut); + stream.write(1); + stream.close(); + stream.close(); // should not throw or close the delegate again + assertThat(recordingOut.closed).isTrue(); + } + + @Test + void shouldCloseEmptyStreamWithoutError() throws Exception { + final RecordingOutputStream recordingOut = new RecordingOutputStream(); + createStream(recordingOut); + stream.close(); + assertThat(recordingOut.closed).isTrue(); + assertThat(recordingOut.getWrittenData()).isEmpty(); + } + + @Test + void shouldCloseDelegateStreamEvenIfFlushThrows() throws Exception { + final IOException flushFailure = new IOException("flush failed"); + final boolean[] closeCalled = {false}; + final OutputStream failingStream = + new OutputStream() { + @Override + public void write(final int b) {} + + @Override + public void flush() throws IOException { + throw flushFailure; + } + + @Override + public void close() { + closeCalled[0] = true; + } + }; + + createStream(failingStream); + assertThatThrownBy(stream::close).isInstanceOf(IOException.class).isSameAs(flushFailure); + assertThat(closeCalled[0]).isTrue(); + } + + @Test + void shouldPropagateCloseException() throws Exception { + final IOException closeFailure = new IOException("close failed"); + final OutputStream failingStream = + new OutputStream() { + @Override + public void write(final int b) {} + + @Override + public void close() throws IOException { + throw closeFailure; + } + }; + + createStream(failingStream); + assertThatThrownBy(stream::close).isInstanceOf(IOException.class).isSameAs(closeFailure); + } + + @Test + void shouldChainCloseExceptionAsSuppressedWhenFlushAlsoFails() throws Exception { + final IOException flushFailure = new IOException("flush failed"); + final IOException closeFailure = new IOException("close failed"); + final OutputStream failingStream = + new OutputStream() { + @Override + public void write(final int b) {} + + @Override + public void flush() throws IOException { + throw flushFailure; + } + + @Override + public void close() throws IOException { + throw closeFailure; + } + }; + + createStream(failingStream); + assertThatThrownBy(stream::close) + .isInstanceOf(IOException.class) + .isSameAs(flushFailure) + .satisfies( + ex -> + assertThat(ex.getSuppressed()) + .hasSize(1) + .allSatisfy(s -> assertThat(s).isSameAs(closeFailure))); + } + + // --- Thread-safety --- + + /** + * Verifies that {@code close()} blocks until a concurrent write completes. + * + *

A blocking write holds the lock; close() must wait until the write releases it. + */ + @Test + void shouldBlockConcurrentWriteDuringClose() throws Exception { + final CountDownLatch writeStarted = new CountDownLatch(1); + final CountDownLatch writePermit = new CountDownLatch(1); + final RecordingOutputStream recordingOut = new RecordingOutputStream(); + final OutputStream blockingDelegate = + new OutputStream() { + @Override + public void write(final int b) throws IOException { + writeStarted.countDown(); + try { + writePermit.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + recordingOut.write(b); + } + + @Override + public void flush() throws IOException { + recordingOut.flush(); + } + + @Override + public void close() throws IOException { + recordingOut.close(); + } + }; + + createStream(blockingDelegate); + + final AtomicReference writeError = new AtomicReference<>(); + final Thread writer = + new Thread( + () -> { + try { + stream.write(1); + } catch (final Throwable t) { + writeError.set(t); + } + }); + writer.start(); + + // Wait until the write has the lock and is blocked inside the delegate. + writeStarted.await(); + + // close() must wait for the write to finish. + final CountDownLatch closeEntered = new CountDownLatch(1); + final AtomicReference closeError = new AtomicReference<>(); + final Thread closer = + new Thread( + () -> { + closeEntered.countDown(); + try { + stream.close(); + } catch (final Throwable t) { + closeError.set(t); + } + }); + closer.start(); + closeEntered.await(); + + // Writer is still blocked inside the delegate — nothing written yet. + assertThat(recordingOut.getWrittenData()).isEmpty(); + + // Allow the write to complete, then close() should proceed. + writePermit.countDown(); + + writer.join(); + closer.join(); + + assertThat(writeError.get()).isNull(); + assertThat(closeError.get()).isNull(); + assertThat(recordingOut.closed).isTrue(); + assertThat(recordingOut.getWrittenData()).isEqualTo(new byte[] {1}); + } + + /** + * Verifies that setting the interrupt flag before calling {@link + * ObjectStorageOutputStream#write(int)} causes an {@link IOException} wrapping {@link + * InterruptedException}. + */ + @Test + void shouldInterruptWriteOnInterrupt() throws Exception { + createStream(); + + Thread.currentThread().interrupt(); + try { + assertThatThrownBy(() -> stream.write(1)) + .isInstanceOf(IOException.class) + .hasMessageContaining("Interrupted") + .hasCauseInstanceOf(InterruptedException.class); + } finally { + // Clear interrupt flag to avoid contaminating subsequent tests. + Thread.interrupted(); + } + } +}