Skip to content

Commit a5e0346

Browse files
authored
Revert "change mongo scans to filter client-side (#347)" (#350)
This reverts commit 8e9b0d8 which removed the partition from the mongo value schema and used client-side filtering that computed the partition from the mongo key. It turns out that this approach won't actually work because the mongo key may not be the original record key that was used to compute the changelog partition. We therefore cannot use the mongo key to compute the partition.: - some dsl operators include a timestamp in the key - a user writing their own PAPI processor is free to construct their own key, which we cannot predict.
1 parent bad5edc commit a5e0346

38 files changed

+164
-336
lines changed

kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraClient.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import java.util.concurrent.ScheduledExecutorService;
3737
import java.util.concurrent.ScheduledThreadPoolExecutor;
3838
import java.util.function.BooleanSupplier;
39-
import org.apache.kafka.common.utils.Bytes;
4039

4140
/**
4241
* {@code CassandraClient} wraps a {@link CqlSession} with utility methods
@@ -50,10 +49,10 @@ public class CassandraClient {
5049
private final CqlSession session;
5150

5251
private final ResponsiveConfig config;
53-
private final TableCache<Bytes, Integer, RemoteKVTable<BoundStatement>> kvFactory;
54-
private final TableCache<Bytes, Integer, RemoteKVTable<BoundStatement>> factFactory;
52+
private final TableCache<RemoteKVTable<BoundStatement>> kvFactory;
53+
private final TableCache<RemoteKVTable<BoundStatement>> factFactory;
5554
private final WindowedTableCache<RemoteWindowedTable<BoundStatement>> windowedFactory;
56-
private final TableCache<Bytes, Integer, CassandraFactTable> globalFactory;
55+
private final TableCache<CassandraFactTable> globalFactory;
5756

5857
/**
5958
* @param session the Cassandra session, expected to be initialized
@@ -179,15 +178,15 @@ public void shutdown() {
179178
session.close();
180179
}
181180

182-
public TableCache<Bytes, Integer, CassandraFactTable> globalFactory() {
181+
public TableCache<CassandraFactTable> globalFactory() {
183182
return globalFactory;
184183
}
185184

186-
public TableCache<Bytes, Integer, RemoteKVTable<BoundStatement>> kvFactory() {
185+
public TableCache<RemoteKVTable<BoundStatement>> kvFactory() {
187186
return kvFactory;
188187
}
189188

190-
public TableCache<Bytes, Integer, RemoteKVTable<BoundStatement>> factFactory() {
189+
public TableCache<RemoteKVTable<BoundStatement>> factFactory() {
191190
return factFactory;
192191
}
193192

kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactFlushManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,13 @@ public class CassandraFactFlushManager extends KVFlushManager {
3434
public CassandraFactFlushManager(
3535
final CassandraFactTable table,
3636
final CassandraClient client,
37-
final int kafkaPartition,
38-
final TablePartitioner<Bytes, Integer> partitioner
37+
final int kafkaPartition
3938
) {
4039
this.table = table;
4140
this.client = client;
4241
this.kafkaPartition = kafkaPartition;
43-
this.partitioner = partitioner;
4442

43+
partitioner = TablePartitioner.defaultPartitioner();
4544
logPrefix = String.format("%s[%d] fact-store", table.name(), kafkaPartition);
4645
}
4746

kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
3333
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
3434
import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
35-
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
3635
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
3736
import java.nio.ByteBuffer;
3837
import java.time.Instant;
@@ -57,7 +56,6 @@ public class CassandraFactTable implements RemoteKVTable<BoundStatement> {
5756
private final PreparedStatement delete;
5857
private final PreparedStatement fetchOffset;
5958
private final PreparedStatement setOffset;
60-
private final TablePartitioner<Bytes, Integer> partitioner;
6159

6260
public CassandraFactTable(
6361
final String name,
@@ -66,8 +64,7 @@ public CassandraFactTable(
6664
final PreparedStatement insert,
6765
final PreparedStatement delete,
6866
final PreparedStatement fetchOffset,
69-
final PreparedStatement setOffset,
70-
final TablePartitioner<Bytes, Integer> partitioner
67+
final PreparedStatement setOffset
7168
) {
7269
this.name = name;
7370
this.client = client;
@@ -76,11 +73,10 @@ public CassandraFactTable(
7673
this.delete = delete;
7774
this.fetchOffset = fetchOffset;
7875
this.setOffset = setOffset;
79-
this.partitioner = partitioner;
8076
}
8177

8278
public static CassandraFactTable create(
83-
final RemoteTableSpec<Bytes, Integer> spec,
79+
final RemoteTableSpec spec,
8480
final CassandraClient client
8581
) {
8682
final String name = spec.tableName();
@@ -165,8 +161,7 @@ public static CassandraFactTable create(
165161
insert,
166162
delete,
167163
fetchOffset,
168-
setOffset,
169-
spec.partitioner()
164+
setOffset
170165
);
171166
}
172167

@@ -198,7 +193,7 @@ public CassandraFactFlushManager init(
198193
.build()
199194
);
200195

201-
return new CassandraFactFlushManager(this, client, kafkaPartition, partitioner);
196+
return new CassandraFactFlushManager(this, client, kafkaPartition);
202197
}
203198

204199
@Override

kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public class CassandraKeyValueTable implements RemoteKVTable<BoundStatement> {
7777
private final PreparedStatement ensureEpoch;
7878

7979
public static CassandraKeyValueTable create(
80-
final RemoteTableSpec<Bytes, Integer> spec,
80+
final RemoteTableSpec spec,
8181
final CassandraClient client
8282
) throws InterruptedException, TimeoutException {
8383
final String name = spec.tableName();

kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public class CassandraWindowedTable implements RemoteWindowedTable<BoundStatemen
9797
private final PreparedStatement ensureEpoch;
9898

9999
public static CassandraWindowedTable create(
100-
final RemoteTableSpec<WindowedKey, Segmenter.SegmentPartition> spec,
100+
final RemoteTableSpec spec,
101101
final CassandraClient client,
102102
final WindowSegmentPartitioner partitioner
103103
) throws InterruptedException, TimeoutException {

kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoKVFlushManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,13 @@ public class MongoKVFlushManager extends KVFlushManager {
4444
public MongoKVFlushManager(
4545
final MongoKVTable table,
4646
final MongoCollection<KVDoc> kvDocs,
47-
final int kafkaPartition,
48-
final TablePartitioner<Bytes, Integer> partitioner
47+
final int kafkaPartition
4948
) {
5049
this.table = table;
5150
this.kvDocs = kvDocs;
5251
this.kafkaPartition = kafkaPartition;
53-
this.partitioner = partitioner;
5452

53+
partitioner = TablePartitioner.defaultPartitioner();
5554
logPrefix = String.format("%s[%d] kv-store {epoch=%d} ",
5655
table.name(), kafkaPartition, table.localEpoch(kafkaPartition));
5756
log = new LogContext(logPrefix).logger(MongoKVFlushManager.class);

kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTableSpecFactory.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,36 +40,35 @@
4040
*/
4141
public class RemoteTableSpecFactory {
4242

43-
public static RemoteTableSpec<Bytes, Integer> globalSpec(
43+
public static RemoteTableSpec globalSpec(
4444
final ResponsiveKeyValueParams params,
4545
final TablePartitioner<Bytes, Integer> partitioner
4646
) {
47-
return new GlobalTableSpec<>(new BaseTableSpec<>(params.name().tableName(), partitioner));
47+
return new GlobalTableSpec(new BaseTableSpec(params.name().tableName(), partitioner));
4848
}
4949

50-
public static RemoteTableSpec<Bytes, Integer> fromKVParams(
50+
public static RemoteTableSpec fromKVParams(
5151
final ResponsiveKeyValueParams params,
5252
final TablePartitioner<Bytes, Integer> partitioner
5353
) {
54-
RemoteTableSpec<Bytes, Integer> spec =
55-
new BaseTableSpec<>(params.name().tableName(), partitioner);
54+
RemoteTableSpec spec = new BaseTableSpec(params.name().tableName(), partitioner);
5655

5756
if (params.timeToLive().isPresent()) {
58-
spec = new TtlTableSpec<>(spec, params.timeToLive().get());
57+
spec = new TtlTableSpec(spec, params.timeToLive().get());
5958
}
6059

6160
if (params.schemaType() == SchemaTypes.KVSchema.FACT) {
62-
spec = new TimeWindowedCompactionTableSpec<>(spec);
61+
spec = new TimeWindowedCompactionTableSpec(spec);
6362
}
6463

6564
return spec;
6665
}
6766

68-
public static RemoteTableSpec<WindowedKey, SegmentPartition> fromWindowParams(
67+
public static RemoteTableSpec fromWindowParams(
6968
final ResponsiveWindowParams params,
7069
final TablePartitioner<WindowedKey, SegmentPartition> partitioner
7170
) {
72-
return new BaseTableSpec<>(params.name().tableName(), partitioner);
71+
return new BaseTableSpec(params.name().tableName(), partitioner);
7372
}
7473

7574
}

kafka-client/src/main/java/dev/responsive/kafka/internal/db/SessionTableCache.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616

1717
package dev.responsive.kafka.internal.db;
1818

19-
import dev.responsive.kafka.internal.db.partitioning.Segmenter;
2019
import dev.responsive.kafka.internal.db.partitioning.SessionSegmentPartitioner;
2120
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
22-
import dev.responsive.kafka.internal.utils.SessionKey;
2321
import java.util.HashMap;
2422
import java.util.Map;
2523
import java.util.concurrent.TimeoutException;
@@ -30,10 +28,8 @@ public class SessionTableCache<T extends RemoteTable<?, ?>> {
3028

3129
@FunctionalInterface
3230
public interface Factory<T> {
33-
T create(
34-
final RemoteTableSpec<SessionKey, Segmenter.SegmentPartition> spec,
35-
SessionSegmentPartitioner partitioner
36-
) throws InterruptedException, TimeoutException;
31+
T create(final RemoteTableSpec spec, SessionSegmentPartitioner partitioner)
32+
throws InterruptedException, TimeoutException;
3733
}
3834

3935
private final Map<String, T> tables = new HashMap<>();
@@ -47,10 +43,7 @@ public SessionTableCache(final Factory<T> factory) {
4743
* Creates a table with the supplied {@code tableName} with the
4844
* desired schema.
4945
*/
50-
public synchronized T create(
51-
RemoteTableSpec<SessionKey, Segmenter.SegmentPartition> spec,
52-
SessionSegmentPartitioner partitioner
53-
)
46+
public synchronized T create(RemoteTableSpec spec, SessionSegmentPartitioner partitioner)
5447
throws InterruptedException, TimeoutException {
5548
final T existing = tables.get(spec.tableName());
5649
if (existing != null) {

kafka-client/src/main/java/dev/responsive/kafka/internal/db/TableCache.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,26 @@
2828
* table are only prepared once during the lifetime of the application.
2929
*/
3030
@ThreadSafe
31-
public class TableCache<K, V, T extends RemoteTable<?, ?>> {
31+
public class TableCache<T extends RemoteTable<?, ?>> {
3232

3333
@FunctionalInterface
34-
public interface Factory<K, V, T> {
35-
T create(final RemoteTableSpec<K, V> spec)
34+
public interface Factory<T> {
35+
T create(final RemoteTableSpec spec)
3636
throws InterruptedException, TimeoutException;
3737
}
3838

3939
private final Map<String, T> tables = new HashMap<>();
40-
private final Factory<K, V, T> factory;
40+
private final Factory<T> factory;
4141

42-
public TableCache(final Factory<K, V, T> factory) {
42+
public TableCache(final Factory<T> factory) {
4343
this.factory = factory;
4444
}
4545

4646
/**
4747
* Creates a table with the supplied {@code tableName} with the
4848
* desired schema.
4949
*/
50-
public synchronized T create(RemoteTableSpec<K, V> spec)
50+
public synchronized T create(RemoteTableSpec spec)
5151
throws InterruptedException, TimeoutException {
5252
final T existing = tables.get(spec.tableName());
5353
if (existing != null) {

kafka-client/src/main/java/dev/responsive/kafka/internal/db/WindowedTableCache.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616

1717
package dev.responsive.kafka.internal.db;
1818

19-
import dev.responsive.kafka.internal.db.partitioning.Segmenter;
2019
import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner;
2120
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
22-
import dev.responsive.kafka.internal.utils.WindowedKey;
2321
import java.util.HashMap;
2422
import java.util.Map;
2523
import java.util.concurrent.TimeoutException;
@@ -30,10 +28,7 @@ public class WindowedTableCache<T extends RemoteTable<?, ?>> {
3028

3129
@FunctionalInterface
3230
public interface Factory<T> {
33-
T create(
34-
final RemoteTableSpec<WindowedKey, Segmenter.SegmentPartition> spec,
35-
WindowSegmentPartitioner partitioner
36-
)
31+
T create(final RemoteTableSpec spec, WindowSegmentPartitioner partitioner)
3732
throws InterruptedException, TimeoutException;
3833
}
3934

@@ -48,10 +43,7 @@ public WindowedTableCache(final Factory<T> factory) {
4843
* Creates a table with the supplied {@code tableName} with the
4944
* desired schema.
5045
*/
51-
public synchronized T create(
52-
RemoteTableSpec<WindowedKey, Segmenter.SegmentPartition> spec,
53-
WindowSegmentPartitioner partitioner
54-
)
46+
public synchronized T create(RemoteTableSpec spec, WindowSegmentPartitioner partitioner)
5547
throws InterruptedException, TimeoutException {
5648
final T existing = tables.get(spec.tableName());
5749
if (existing != null) {

0 commit comments

Comments
 (0)