@@ -35,32 +35,23 @@ abstract class BlobDescriptorStreamRead implements AutoCloseable, Closeable {
3535
3636 protected final long readId ;
3737 protected final RangeSpec rangeSpec ;
38- protected final List <ChildRef > childRefs ;
3938 protected final RetryContext retryContext ;
4039 protected final AtomicLong readOffset ;
4140 protected boolean closed ;
4241 protected boolean tombstoned ;
4342
4443 BlobDescriptorStreamRead (long readId , RangeSpec rangeSpec , RetryContext retryContext ) {
45- this (
46- readId ,
47- rangeSpec ,
48- Collections .synchronizedList (new ArrayList <>()),
49- new AtomicLong (rangeSpec .begin ()),
50- retryContext ,
51- false );
44+ this (readId , rangeSpec , new AtomicLong (rangeSpec .begin ()), retryContext , false );
5245 }
5346
5447 BlobDescriptorStreamRead (
5548 long readId ,
5649 RangeSpec rangeSpec ,
57- List <ChildRef > childRefs ,
5850 AtomicLong readOffset ,
5951 RetryContext retryContext ,
6052 boolean closed ) {
6153 this .readId = readId ;
6254 this .rangeSpec = rangeSpec ;
63- this .childRefs = childRefs ;
6455 this .retryContext = retryContext ;
6556 this .readOffset = readOffset ;
6657 this .closed = closed ;
@@ -99,17 +90,15 @@ final ReadRange makeReadRange() {
9990 }
10091
10192 @ Override
102- public void close () throws IOException {
103- if (!closed ) {
104- retryContext .reset ();
105- closed = true ;
106- GrpcUtils .closeAll (childRefs );
107- }
108- }
93+ public abstract void close () throws IOException ;
10994
110- abstract <T extends Throwable > void recordError (T t , OnSuccess onSuccess , OnFailure <T > onFailure );
95+ <T extends Throwable > void recordError (T t , OnSuccess onSuccess , OnFailure <T > onFailure ) {
96+ retryContext .recordError (t , onSuccess , onFailure );
97+ }
11198
112- public abstract boolean readyToSend ();
99+ boolean readyToSend () {
100+ return !tombstoned && !retryContext .inBackoff ();
101+ }
113102
114103 static AccumulatingRead <byte []> createByteArrayAccumulatingRead (
115104 long readId ,
@@ -129,6 +118,7 @@ static ZeroCopyByteStringAccumulatingRead createZeroCopyByteStringAccumulatingRe
129118
130119 /** Base class of a read that will accumulate before completing by resolving a future */
131120 abstract static class AccumulatingRead <Result > extends BlobDescriptorStreamRead {
121+ protected final List <ChildRef > childRefs ;
132122 protected final SettableApiFuture <Result > complete ;
133123
134124 private AccumulatingRead (
@@ -138,6 +128,7 @@ private AccumulatingRead(
138128 SettableApiFuture <Result > complete ) {
139129 super (readId , rangeSpec , retryContext );
140130 this .complete = complete ;
131+ this .childRefs = Collections .synchronizedList (new ArrayList <>());
141132 }
142133
143134 private AccumulatingRead (
@@ -148,7 +139,8 @@ private AccumulatingRead(
148139 RetryContext retryContext ,
149140 boolean closed ,
150141 SettableApiFuture <Result > complete ) {
151- super (readId , rangeSpec , childRefs , readOffset , retryContext , closed );
142+ super (readId , rangeSpec , readOffset , retryContext , closed );
143+ this .childRefs = childRefs ;
152144 this .complete = complete ;
153145 }
154146
@@ -157,6 +149,14 @@ boolean acceptingBytes() {
157149 return !complete .isDone () && !tombstoned ;
158150 }
159151
152+ @ Override
153+ void accept (ChildRef childRef ) throws IOException {
154+ retryContext .reset ();
155+ int size = childRef .byteString ().size ();
156+ childRefs .add (childRef );
157+ readOffset .addAndGet (size );
158+ }
159+
160160 @ Override
161161 ApiFuture <?> fail (Throwable t ) {
162162 try {
@@ -171,13 +171,12 @@ ApiFuture<?> fail(Throwable t) {
171171 }
172172
173173 @ Override
174- <T extends Throwable > void recordError (T t , OnSuccess onSuccess , OnFailure <T > onFailure ) {
175- retryContext .recordError (t , onSuccess , onFailure );
176- }
177-
178- @ Override
179- public boolean readyToSend () {
180- return !tombstoned && !retryContext .inBackoff ();
174+ public void close () throws IOException {
175+ if (!closed ) {
176+ retryContext .reset ();
177+ closed = true ;
178+ GrpcUtils .closeAll (childRefs );
179+ }
181180 }
182181 }
183182
@@ -193,11 +192,10 @@ private StreamingRead(long readId, RangeSpec range, RetryContext retryContext) {
193192 private StreamingRead (
194193 long readId ,
195194 RangeSpec rangeSpec ,
196- List <ChildRef > childRefs ,
197195 AtomicLong readOffset ,
198196 RetryContext retryContext ,
199197 boolean closed ) {
200- super (readId , rangeSpec , childRefs , readOffset , retryContext , closed );
198+ super (readId , rangeSpec , readOffset , retryContext , closed );
201199 }
202200 }
203201
@@ -222,14 +220,6 @@ private ByteArrayAccumulatingRead(
222220 super (readId , rangeSpec , childRefs , readOffset , retryContext , closed , complete );
223221 }
224222
225- @ Override
226- void accept (ChildRef childRef ) throws IOException {
227- retryContext .reset ();
228- int size = childRef .byteString ().size ();
229- childRefs .add (childRef );
230- readOffset .addAndGet (size );
231- }
232-
233223 @ Override
234224 void eof () throws IOException {
235225 retryContext .reset ();
@@ -283,14 +273,6 @@ public ByteString byteString() {
283273 return byteString ;
284274 }
285275
286- @ Override
287- void accept (ChildRef childRef ) throws IOException {
288- retryContext .reset ();
289- int size = childRef .byteString ().size ();
290- childRefs .add (childRef );
291- readOffset .addAndGet (size );
292- }
293-
294276 @ Override
295277 void eof () throws IOException {
296278 retryContext .reset ();
0 commit comments