3232import com .google .storage .v2 .BidiReadObjectRequest ;
3333import com .google .storage .v2 .BidiReadObjectResponse ;
3434import java .io .IOException ;
35+ import java .io .InterruptedIOException ;
3536import java .nio .channels .ScatteringByteChannel ;
37+ import java .util .ArrayList ;
38+ import java .util .IdentityHashMap ;
39+ import java .util .Iterator ;
40+ import java .util .Map .Entry ;
41+ import java .util .concurrent .ExecutionException ;
3642import java .util .concurrent .ScheduledExecutorService ;
43+ import java .util .concurrent .locks .ReentrantLock ;
44+ import org .checkerframework .checker .lock .qual .GuardedBy ;
3745
3846final class BlobDescriptorImpl implements BlobDescriptor {
3947
48+ private final ScheduledExecutorService executor ;
49+ private final ZeroCopyBidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse >
50+ callable ;
4051 private final BlobDescriptorStream stream ;
4152 @ VisibleForTesting final BlobDescriptorState state ;
4253 private final BlobInfo info ;
4354 private final RetryContextProvider retryContextProvider ;
4455
56+ @ GuardedBy ("this.lock" )
57+ private final IdentityHashMap <BlobDescriptorStream , BlobDescriptorState > children ;
58+
59+ private final ReentrantLock lock ;
60+
61+ @ GuardedBy ("this.lock" )
62+ private volatile boolean open ;
63+
4564 private BlobDescriptorImpl (
65+ ScheduledExecutorService executor ,
66+ ZeroCopyBidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > callable ,
4667 BlobDescriptorStream stream ,
4768 BlobDescriptorState state ,
4869 RetryContextProvider retryContextProvider ) {
70+ this .executor = executor ;
71+ this .callable = callable ;
4972 this .stream = stream ;
5073 this .state = state ;
5174 this .info = Conversions .grpc ().blobInfo ().decode (state .getMetadata ());
5275 this .retryContextProvider = retryContextProvider ;
76+ this .children = new IdentityHashMap <>();
77+ this .lock = new ReentrantLock ();
78+ this .open = true ;
5379 }
5480
5581 @ Override
5682 public ApiFuture <byte []> readRangeAsBytes (RangeSpec range ) {
57- checkState (stream .isOpen (), "stream already closed" );
58- long readId = state .newReadId ();
59- SettableApiFuture <byte []> future = SettableApiFuture .create ();
60- AccumulatingRead <byte []> read =
61- BlobDescriptorStreamRead .createByteArrayAccumulatingRead (
62- readId , range , retryContextProvider .create (), future );
63- BidiReadObjectRequest request =
64- BidiReadObjectRequest .newBuilder ().addReadRanges (read .makeReadRange ()).build ();
65- state .putOutstandingRead (readId , read );
66- stream .send (request );
67- return future ;
83+ lock .lock ();
84+ try {
85+ checkState (open , "stream already closed" );
86+ long readId = state .newReadId ();
87+ SettableApiFuture <byte []> future = SettableApiFuture .create ();
88+ AccumulatingRead <byte []> read =
89+ BlobDescriptorStreamRead .createByteArrayAccumulatingRead (
90+ readId , range , retryContextProvider .create (), future );
91+ registerReadInState (readId , read );
92+ return future ;
93+ } finally {
94+ lock .unlock ();
95+ }
6896 }
6997
7098 @ Override
7199 public ScatteringByteChannel readRangeAsChannel (RangeSpec range ) {
72- checkState (stream .isOpen (), "stream already closed" );
73- long readId = state .newReadId ();
74- StreamingRead read =
75- BlobDescriptorStreamRead .streamingRead (readId , range , retryContextProvider .create ());
76- BidiReadObjectRequest request =
77- BidiReadObjectRequest .newBuilder ().addReadRanges (read .makeReadRange ()).build ();
78- // todo: fork the stream and state
79- state .putOutstandingRead (readId , read );
80- stream .send (request );
81- return read ;
100+ lock .lock ();
101+ try {
102+ checkState (open , "stream already closed" );
103+ long readId = state .newReadId ();
104+ StreamingRead read =
105+ BlobDescriptorStreamRead .streamingRead (readId , range , retryContextProvider .create ());
106+ registerReadInState (readId , read );
107+ return read ;
108+ } finally {
109+ lock .unlock ();
110+ }
82111 }
83112
84113 public ApiFuture <DisposableByteString > readRangeAsByteString (RangeSpec range ) {
85- checkState (stream .isOpen (), "stream already closed" );
86- long readId = state .newReadId ();
87- SettableApiFuture <DisposableByteString > future = SettableApiFuture .create ();
88- AccumulatingRead <DisposableByteString > read =
89- BlobDescriptorStreamRead .createZeroCopyByteStringAccumulatingRead (
90- readId , range , future , retryContextProvider .create ());
91- BidiReadObjectRequest request =
92- BidiReadObjectRequest .newBuilder ().addReadRanges (read .makeReadRange ()).build ();
93- state .putOutstandingRead (readId , read );
94- stream .send (request );
95- return future ;
114+ lock .lock ();
115+ try {
116+ checkState (open , "stream already closed" );
117+ long readId = state .newReadId ();
118+ SettableApiFuture <DisposableByteString > future = SettableApiFuture .create ();
119+ AccumulatingRead <DisposableByteString > read =
120+ BlobDescriptorStreamRead .createZeroCopyByteStringAccumulatingRead (
121+ readId , range , retryContextProvider .create (), future );
122+ registerReadInState (readId , read );
123+ return future ;
124+ } finally {
125+ lock .unlock ();
126+ }
96127 }
97128
98129 @ Override
@@ -102,7 +133,49 @@ public BlobInfo getBlobInfo() {
102133
103134 @ Override
104135 public void close () throws IOException {
105- stream .close ();
136+ open = false ;
137+ lock .lock ();
138+ try {
139+ Iterator <Entry <BlobDescriptorStream , BlobDescriptorState >> it =
140+ children .entrySet ().iterator ();
141+ ArrayList <ApiFuture <Void >> closing = new ArrayList <>(children .size ());
142+ while (it .hasNext ()) {
143+ Entry <BlobDescriptorStream , BlobDescriptorState > next = it .next ();
144+ BlobDescriptorStream subStream = next .getKey ();
145+ it .remove ();
146+ closing .add (subStream .closeAsync ());
147+ }
148+ stream .close ();
149+ ApiFutures .allAsList (closing ).get ();
150+ } catch (ExecutionException e ) {
151+ throw new IOException (e .getCause ());
152+ } catch (InterruptedException e ) {
153+ Thread .currentThread ().interrupt ();
154+ throw new InterruptedIOException ();
155+ } finally {
156+ lock .unlock ();
157+ }
158+ }
159+
160+ private void registerReadInState (long readId , BlobDescriptorStreamRead read ) {
161+ BidiReadObjectRequest request =
162+ BidiReadObjectRequest .newBuilder ().addReadRanges (read .makeReadRange ()).build ();
163+ if (state .canHandleNewRead (read )) {
164+ state .putOutstandingRead (readId , read );
165+ stream .send (request );
166+ } else {
167+ BlobDescriptorState child = state .forkChild ();
168+ BlobDescriptorStream newStream =
169+ BlobDescriptorStream .create (executor , callable , child , retryContextProvider .create ());
170+ children .put (newStream , child );
171+ read .setOnCloseCallback (
172+ () -> {
173+ children .remove (newStream );
174+ newStream .close ();
175+ });
176+ child .putOutstandingRead (readId , read );
177+ newStream .send (request );
178+ }
106179 }
107180
108181 static ApiFuture <BlobDescriptor > create (
@@ -119,7 +192,8 @@ static ApiFuture<BlobDescriptor> create(
119192 ApiFuture <BlobDescriptor > blobDescriptorFuture =
120193 ApiFutures .transform (
121194 stream ,
122- nowOpen -> new BlobDescriptorImpl (stream , state , retryContextProvider ),
195+ nowOpen ->
196+ new BlobDescriptorImpl (executor , callable , stream , state , retryContextProvider ),
123197 executor );
124198 stream .send (openRequest );
125199 return StorageException .coalesceAsync (blobDescriptorFuture );
0 commit comments