Skip to content

Commit 8e9b0d8

Browse files
authored
change mongo scans to filter client-side (#347)
1 parent 5de6196 commit 8e9b0d8

38 files changed

+336
-164
lines changed

kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraClient.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.ScheduledExecutorService;
3737
import java.util.concurrent.ScheduledThreadPoolExecutor;
3838
import java.util.function.BooleanSupplier;
39+
import org.apache.kafka.common.utils.Bytes;
3940

4041
/**
4142
* {@code CassandraClient} wraps a {@link CqlSession} with utility methods
@@ -49,10 +50,10 @@ public class CassandraClient {
4950
private final CqlSession session;
5051

5152
private final ResponsiveConfig config;
52-
private final TableCache<RemoteKVTable<BoundStatement>> kvFactory;
53-
private final TableCache<RemoteKVTable<BoundStatement>> factFactory;
53+
private final TableCache<Bytes, Integer, RemoteKVTable<BoundStatement>> kvFactory;
54+
private final TableCache<Bytes, Integer, RemoteKVTable<BoundStatement>> factFactory;
5455
private final WindowedTableCache<RemoteWindowedTable<BoundStatement>> windowedFactory;
55-
private final TableCache<CassandraFactTable> globalFactory;
56+
private final TableCache<Bytes, Integer, CassandraFactTable> globalFactory;
5657

5758
/**
5859
* @param session the Cassandra session, expected to be initialized
@@ -178,15 +179,15 @@ public void shutdown() {
178179
session.close();
179180
}
180181

181-
public TableCache<CassandraFactTable> globalFactory() {
182+
public TableCache<Bytes, Integer, CassandraFactTable> globalFactory() {
182183
return globalFactory;
183184
}
184185

185-
public TableCache<RemoteKVTable<BoundStatement>> kvFactory() {
186+
public TableCache<Bytes, Integer, RemoteKVTable<BoundStatement>> kvFactory() {
186187
return kvFactory;
187188
}
188189

189-
public TableCache<RemoteKVTable<BoundStatement>> factFactory() {
190+
public TableCache<Bytes, Integer, RemoteKVTable<BoundStatement>> factFactory() {
190191
return factFactory;
191192
}
192193

kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactFlushManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,14 @@ public class CassandraFactFlushManager extends KVFlushManager {
3434
public CassandraFactFlushManager(
3535
final CassandraFactTable table,
3636
final CassandraClient client,
37-
final int kafkaPartition
37+
final int kafkaPartition,
38+
final TablePartitioner<Bytes, Integer> partitioner
3839
) {
3940
this.table = table;
4041
this.client = client;
4142
this.kafkaPartition = kafkaPartition;
43+
this.partitioner = partitioner;
4244

43-
partitioner = TablePartitioner.defaultPartitioner();
4445
logPrefix = String.format("%s[%d] fact-store", table.name(), kafkaPartition);
4546
}
4647

kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
3333
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
3434
import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
35+
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
3536
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
3637
import java.nio.ByteBuffer;
3738
import java.time.Instant;
@@ -56,6 +57,7 @@ public class CassandraFactTable implements RemoteKVTable<BoundStatement> {
5657
private final PreparedStatement delete;
5758
private final PreparedStatement fetchOffset;
5859
private final PreparedStatement setOffset;
60+
private final TablePartitioner<Bytes, Integer> partitioner;
5961

6062
public CassandraFactTable(
6163
final String name,
@@ -64,7 +66,8 @@ public CassandraFactTable(
6466
final PreparedStatement insert,
6567
final PreparedStatement delete,
6668
final PreparedStatement fetchOffset,
67-
final PreparedStatement setOffset
69+
final PreparedStatement setOffset,
70+
final TablePartitioner<Bytes, Integer> partitioner
6871
) {
6972
this.name = name;
7073
this.client = client;
@@ -73,10 +76,11 @@ public CassandraFactTable(
7376
this.delete = delete;
7477
this.fetchOffset = fetchOffset;
7578
this.setOffset = setOffset;
79+
this.partitioner = partitioner;
7680
}
7781

7882
public static CassandraFactTable create(
79-
final RemoteTableSpec spec,
83+
final RemoteTableSpec<Bytes, Integer> spec,
8084
final CassandraClient client
8185
) {
8286
final String name = spec.tableName();
@@ -161,7 +165,8 @@ public static CassandraFactTable create(
161165
insert,
162166
delete,
163167
fetchOffset,
164-
setOffset
168+
setOffset,
169+
spec.partitioner()
165170
);
166171
}
167172

@@ -193,7 +198,7 @@ public CassandraFactFlushManager init(
193198
.build()
194199
);
195200

196-
return new CassandraFactFlushManager(this, client, kafkaPartition);
201+
return new CassandraFactFlushManager(this, client, kafkaPartition, partitioner);
197202
}
198203

199204
@Override

kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public class CassandraKeyValueTable implements RemoteKVTable<BoundStatement> {
7777
private final PreparedStatement ensureEpoch;
7878

7979
public static CassandraKeyValueTable create(
80-
final RemoteTableSpec spec,
80+
final RemoteTableSpec<Bytes, Integer> spec,
8181
final CassandraClient client
8282
) throws InterruptedException, TimeoutException {
8383
final String name = spec.tableName();

kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public class CassandraWindowedTable implements RemoteWindowedTable<BoundStatemen
9797
private final PreparedStatement ensureEpoch;
9898

9999
public static CassandraWindowedTable create(
100-
final RemoteTableSpec spec,
100+
final RemoteTableSpec<WindowedKey, Segmenter.SegmentPartition> spec,
101101
final CassandraClient client,
102102
final WindowSegmentPartitioner partitioner
103103
) throws InterruptedException, TimeoutException {

kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoKVFlushManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,14 @@ public class MongoKVFlushManager extends KVFlushManager {
4444
public MongoKVFlushManager(
4545
final MongoKVTable table,
4646
final MongoCollection<KVDoc> kvDocs,
47-
final int kafkaPartition
47+
final int kafkaPartition,
48+
final TablePartitioner<Bytes, Integer> partitioner
4849
) {
4950
this.table = table;
5051
this.kvDocs = kvDocs;
5152
this.kafkaPartition = kafkaPartition;
53+
this.partitioner = partitioner;
5254

53-
partitioner = TablePartitioner.defaultPartitioner();
5455
logPrefix = String.format("%s[%d] kv-store {epoch=%d} ",
5556
table.name(), kafkaPartition, table.localEpoch(kafkaPartition));
5657
log = new LogContext(logPrefix).logger(MongoKVFlushManager.class);

kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTableSpecFactory.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,35 +40,36 @@
4040
*/
4141
public class RemoteTableSpecFactory {
4242

43-
public static RemoteTableSpec globalSpec(
43+
public static RemoteTableSpec<Bytes, Integer> globalSpec(
4444
final ResponsiveKeyValueParams params,
4545
final TablePartitioner<Bytes, Integer> partitioner
4646
) {
47-
return new GlobalTableSpec(new BaseTableSpec(params.name().tableName(), partitioner));
47+
return new GlobalTableSpec<>(new BaseTableSpec<>(params.name().tableName(), partitioner));
4848
}
4949

50-
public static RemoteTableSpec fromKVParams(
50+
public static RemoteTableSpec<Bytes, Integer> fromKVParams(
5151
final ResponsiveKeyValueParams params,
5252
final TablePartitioner<Bytes, Integer> partitioner
5353
) {
54-
RemoteTableSpec spec = new BaseTableSpec(params.name().tableName(), partitioner);
54+
RemoteTableSpec<Bytes, Integer> spec =
55+
new BaseTableSpec<>(params.name().tableName(), partitioner);
5556

5657
if (params.timeToLive().isPresent()) {
57-
spec = new TtlTableSpec(spec, params.timeToLive().get());
58+
spec = new TtlTableSpec<>(spec, params.timeToLive().get());
5859
}
5960

6061
if (params.schemaType() == SchemaTypes.KVSchema.FACT) {
61-
spec = new TimeWindowedCompactionTableSpec(spec);
62+
spec = new TimeWindowedCompactionTableSpec<>(spec);
6263
}
6364

6465
return spec;
6566
}
6667

67-
public static RemoteTableSpec fromWindowParams(
68+
public static RemoteTableSpec<WindowedKey, SegmentPartition> fromWindowParams(
6869
final ResponsiveWindowParams params,
6970
final TablePartitioner<WindowedKey, SegmentPartition> partitioner
7071
) {
71-
return new BaseTableSpec(params.name().tableName(), partitioner);
72+
return new BaseTableSpec<>(params.name().tableName(), partitioner);
7273
}
7374

7475
}

kafka-client/src/main/java/dev/responsive/kafka/internal/db/SessionTableCache.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616

1717
package dev.responsive.kafka.internal.db;
1818

19+
import dev.responsive.kafka.internal.db.partitioning.Segmenter;
1920
import dev.responsive.kafka.internal.db.partitioning.SessionSegmentPartitioner;
2021
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
22+
import dev.responsive.kafka.internal.utils.SessionKey;
2123
import java.util.HashMap;
2224
import java.util.Map;
2325
import java.util.concurrent.TimeoutException;
@@ -28,8 +30,10 @@ public class SessionTableCache<T extends RemoteTable<?, ?>> {
2830

2931
@FunctionalInterface
3032
public interface Factory<T> {
31-
T create(final RemoteTableSpec spec, SessionSegmentPartitioner partitioner)
32-
throws InterruptedException, TimeoutException;
33+
T create(
34+
final RemoteTableSpec<SessionKey, Segmenter.SegmentPartition> spec,
35+
SessionSegmentPartitioner partitioner
36+
) throws InterruptedException, TimeoutException;
3337
}
3438

3539
private final Map<String, T> tables = new HashMap<>();
@@ -43,7 +47,10 @@ public SessionTableCache(final Factory<T> factory) {
4347
* Creates a table with the supplied {@code tableName} with the
4448
* desired schema.
4549
*/
46-
public synchronized T create(RemoteTableSpec spec, SessionSegmentPartitioner partitioner)
50+
public synchronized T create(
51+
RemoteTableSpec<SessionKey, Segmenter.SegmentPartition> spec,
52+
SessionSegmentPartitioner partitioner
53+
)
4754
throws InterruptedException, TimeoutException {
4855
final T existing = tables.get(spec.tableName());
4956
if (existing != null) {

kafka-client/src/main/java/dev/responsive/kafka/internal/db/TableCache.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,26 @@
2828
* table are only prepared once during the lifetime of the application.
2929
*/
3030
@ThreadSafe
31-
public class TableCache<T extends RemoteTable<?, ?>> {
31+
public class TableCache<K, V, T extends RemoteTable<?, ?>> {
3232

3333
@FunctionalInterface
34-
public interface Factory<T> {
35-
T create(final RemoteTableSpec spec)
34+
public interface Factory<K, V, T> {
35+
T create(final RemoteTableSpec<K, V> spec)
3636
throws InterruptedException, TimeoutException;
3737
}
3838

3939
private final Map<String, T> tables = new HashMap<>();
40-
private final Factory<T> factory;
40+
private final Factory<K, V, T> factory;
4141

42-
public TableCache(final Factory<T> factory) {
42+
public TableCache(final Factory<K, V, T> factory) {
4343
this.factory = factory;
4444
}
4545

4646
/**
4747
* Creates a table with the supplied {@code tableName} with the
4848
* desired schema.
4949
*/
50-
public synchronized T create(RemoteTableSpec spec)
50+
public synchronized T create(RemoteTableSpec<K, V> spec)
5151
throws InterruptedException, TimeoutException {
5252
final T existing = tables.get(spec.tableName());
5353
if (existing != null) {

kafka-client/src/main/java/dev/responsive/kafka/internal/db/WindowedTableCache.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616

1717
package dev.responsive.kafka.internal.db;
1818

19+
import dev.responsive.kafka.internal.db.partitioning.Segmenter;
1920
import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner;
2021
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
22+
import dev.responsive.kafka.internal.utils.WindowedKey;
2123
import java.util.HashMap;
2224
import java.util.Map;
2325
import java.util.concurrent.TimeoutException;
@@ -28,7 +30,10 @@ public class WindowedTableCache<T extends RemoteTable<?, ?>> {
2830

2931
@FunctionalInterface
3032
public interface Factory<T> {
31-
T create(final RemoteTableSpec spec, WindowSegmentPartitioner partitioner)
33+
T create(
34+
final RemoteTableSpec<WindowedKey, Segmenter.SegmentPartition> spec,
35+
WindowSegmentPartitioner partitioner
36+
)
3237
throws InterruptedException, TimeoutException;
3338
}
3439

@@ -43,7 +48,10 @@ public WindowedTableCache(final Factory<T> factory) {
4348
* Creates a table with the supplied {@code tableName} with the
4449
* desired schema.
4550
*/
46-
public synchronized T create(RemoteTableSpec spec, WindowSegmentPartitioner partitioner)
51+
public synchronized T create(
52+
RemoteTableSpec<WindowedKey, Segmenter.SegmentPartition> spec,
53+
WindowSegmentPartitioner partitioner
54+
)
4755
throws InterruptedException, TimeoutException {
4856
final T existing = tables.get(spec.tableName());
4957
if (existing != null) {

0 commit comments

Comments
 (0)