Skip to content

Commit cc51021

Browse files
Novotnik, PetrDavid Moravek
authored andcommitted
apache#51 Do _not_ store storage-provider in states by default
1 parent 75d508f commit cc51021

File tree

10 files changed

+36
-60
lines changed

10 files changed

+36
-60
lines changed

sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ private class JoinState
318318

319319
@SuppressWarnings("unchecked")
320320
public JoinState(Context<OUT> context, StorageProvider storageProvider) {
321-
super(context, storageProvider);
321+
super(context);
322322
leftElements = storageProvider.getListStorage(LEFT_STATE_DESCR);
323323
rightElements = storageProvider.getListStorage(RIGHT_STATE_DESCR);
324324
}

sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ public State<E, E> apply(Context<E> ctx, StorageProvider storageProvider) {
310310
CombiningReduceState(Context<E> context,
311311
StorageProvider storageProvider,
312312
CombinableReduceFunction<E> reducer) {
313-
super(context, storageProvider);
313+
super(context);
314314
this.reducer = Objects.requireNonNull(reducer);
315315

316316
@SuppressWarnings("unchecked")
@@ -373,7 +373,7 @@ static final class Factory<VALUE, OUT>
373373
NonCombiningReduceState(Context<OUT> context,
374374
StorageProvider storageProvider,
375375
ReduceFunction<VALUE, OUT> reducer) {
376-
super(context, storageProvider);
376+
super(context);
377377
this.reducer = Objects.requireNonNull(reducer);
378378

379379
@SuppressWarnings("unchecked")

sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private static final class MaxScored<V, C extends Comparable<C>>
5757

5858
@SuppressWarnings("unchecked")
5959
MaxScored(Context<Pair<V, C>> context, StorageProvider storageProvider) {
60-
super(context, storageProvider);
60+
super(context);
6161
curr = (ValueStorage) storageProvider.getValueStorage(MAX_STATE_DESCR);
6262
}
6363

sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/state/State.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ public abstract class State<IN, OUT> implements Closeable {
2626

2727
/** Collector of output of this state. */
2828
private final Context<OUT> context;
29-
/** Provider of state storage. */
30-
private final StorageProvider storageProvider;
3129

3230
/**
3331
* Add element to this state.
@@ -42,22 +40,15 @@ public abstract class State<IN, OUT> implements Closeable {
4240
*/
4341
public abstract void flush();
4442

45-
protected State(
46-
Context<OUT> context,
47-
StorageProvider storageProvider) {
43+
protected State(Context<OUT> context) {
4844

4945
this.context = context;
50-
this.storageProvider = storageProvider;
5146
}
5247

5348
public Context<OUT> getContext() {
5449
return context;
5550
}
5651

57-
public StorageProvider getStorageProvider() {
58-
return storageProvider;
59-
}
60-
6152
/**
6253
* Closes this state. Invoked after {@link #flush()} and before
6354
* this state gets disposed.

sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKeyTest.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.junit.Test;
3030

3131
import java.time.Duration;
32+
import java.util.Iterator;
3233

3334
import static org.junit.Assert.*;
3435

@@ -155,7 +156,7 @@ private static class WordCountState extends State<Long, Long> {
155156
protected WordCountState(
156157
Context<Long> context,
157158
StorageProvider storageProvider) {
158-
super(context, storageProvider);
159+
super(context);
159160
sum = storageProvider.getValueStorage(ValueStorageDescriptor.of(
160161
"sum", Long.class, 0L));
161162
}
@@ -171,17 +172,12 @@ public void flush() {
171172
}
172173

173174
static WordCountState combine(Iterable<WordCountState> others) {
174-
WordCountState state = null;
175-
for (WordCountState s : others) {
176-
if (state == null) {
177-
state = new WordCountState(
178-
s.getContext(),
179-
s.getStorageProvider());
180-
}
181-
state.add(s.sum.get());
175+
Iterator<WordCountState> iter = others.iterator();
176+
WordCountState target = iter.next();
177+
while (iter.hasNext()) {
178+
target.add(iter.next().sum.get());
182179
}
183-
184-
return state;
180+
return target;
185181
}
186182

187183
@Override

sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RSBKWindowingTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private static class AccState<VALUE> extends State<VALUE, VALUE> {
5151
AccState(Context<VALUE> context,
5252
StorageProvider storageProvider)
5353
{
54-
super(context, storageProvider);
54+
super(context);
5555
reducableValues = storageProvider.getListStorage(
5656
ListStorageDescriptor.of("vals", (Class) Object.class));
5757
}

sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/InMemExecutorTest.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.Collections;
5959
import java.util.Comparator;
6060
import java.util.HashMap;
61+
import java.util.Iterator;
6162
import java.util.List;
6263
import java.util.Map;
6364
import java.util.Set;
@@ -276,7 +277,7 @@ public static class SortState extends State<Integer, Integer> {
276277
Context<Integer> c,
277278
StorageProvider storageProvider) {
278279

279-
super(c, storageProvider);
280+
super(c);
280281
data = storageProvider.getListStorage(
281282
ListStorageDescriptor.of("data", Integer.class));
282283
}
@@ -297,16 +298,12 @@ public void flush() {
297298
}
298299

299300
static SortState combine(Iterable<SortState> others) {
300-
SortState ret = null;
301-
for (SortState s : others) {
302-
if (ret == null) {
303-
ret = new SortState(
304-
s.getContext(),
305-
s.getStorageProvider());
306-
}
307-
ret.data.addAll(s.data.get());
301+
Iterator<SortState> iter = others.iterator();
302+
SortState target = iter.next();
303+
while (iter.hasNext()) {
304+
target.data.addAll(iter.next().data.get());
308305
}
309-
return ret;
306+
return target;
310307
}
311308

312309
@Override

sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.Comparator;
5757
import java.util.HashMap;
5858
import java.util.HashSet;
59+
import java.util.Iterator;
5960
import java.util.List;
6061
import java.util.Map;
6162
import java.util.stream.Collectors;
@@ -539,7 +540,7 @@ static class SumState extends State<Integer, Integer> {
539540
private final ValueStorage<Integer> sum;
540541

541542
SumState(Context<Integer> context, StorageProvider storageProvider) {
542-
super(context, storageProvider);
543+
super(context);
543544
sum = storageProvider.getValueStorage(
544545
ValueStorageDescriptor.of("sum-state", Integer.class, 0));
545546
}
@@ -560,12 +561,10 @@ public void close() {
560561
}
561562

562563
static SumState combine(Iterable<SumState> states) {
563-
SumState target = null;
564-
for (SumState state : states) {
565-
if (target == null) {
566-
target = new SumState(state.getContext(), state.getStorageProvider());
567-
}
568-
target.add(state.sum.get());
564+
Iterator<SumState> iter = states.iterator();
565+
SumState target = iter.next();
566+
while (iter.hasNext()) {
567+
target.add(iter.next().sum.get());
569568
}
570569
return target;
571570
}

sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceStateByKeyTest.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,8 @@ static class SortState extends State<Integer, Integer> {
8383

8484
final ListStorage<Integer> data;
8585

86-
SortState(
87-
Context<Integer> c,
88-
StorageProvider storageProvider) {
89-
90-
super(c, storageProvider);
86+
SortState(Context<Integer> c, StorageProvider storageProvider) {
87+
super(c);
9188
this.data = storageProvider.getListStorage(
9289
ListStorageDescriptor.of("data", Integer.class));
9390
}
@@ -112,16 +109,12 @@ public void close() {
112109
}
113110

114111
static SortState combine(Iterable<SortState> states) {
115-
SortState ret = null;
116-
for (SortState state : states) {
117-
if (ret == null) {
118-
ret = new SortState(
119-
state.getContext(),
120-
state.getStorageProvider());
121-
}
122-
ret.data.addAll(state.data.get());
112+
Iterator<SortState> iter = states.iterator();
113+
SortState target = iter.next();
114+
while (iter.hasNext()) {
115+
target.data.addAll(iter.next().data.get());
123116
}
124-
return ret;
117+
return target;
125118
}
126119

127120
}
@@ -249,7 +242,7 @@ private static class CountState extends State<String, Long> {
249242
final ValueStorage<Long> count;
250243
CountState(Context<Long> context, StorageProvider storageProvider)
251244
{
252-
super(context, storageProvider);
245+
super(context);
253246
this.count = storageProvider.getValueStorage(
254247
ValueStorageDescriptor.of("count-state", Long.class, 0L));
255248
}
@@ -351,7 +344,7 @@ private static class AccState<VALUE> extends State<VALUE, VALUE> {
351344
AccState(Context<VALUE> context,
352345
StorageProvider storageProvider)
353346
{
354-
super(context, storageProvider);
347+
super(context);
355348
vals = storageProvider.getListStorage(
356349
ListStorageDescriptor.of("vals", (Class) Object.class));
357350
}

sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/WindowingTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ private static class DistinctState extends State<Object, Object> {
208208
private final ValueStorage<Object> storage;
209209

210210
public DistinctState(Context<Object> context, StorageProvider storageProvider) {
211-
super(context, storageProvider);
211+
super(context);
212212
this.storage = storageProvider.getValueStorage(
213213
ValueStorageDescriptor.of("element", Object.class, null));
214214
}

0 commit comments

Comments
 (0)